Whamcloud - gitweb
LU-10193 osd-zfs: backup index object with plain format 10/30910/16
authorFan Yong <fan.yong@intel.com>
Thu, 25 Jan 2018 08:47:20 +0000 (16:47 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 6 Feb 2018 04:27:25 +0000 (04:27 +0000)
Lustre uses ZAP to implement index object. When tar the index
object via backend ZPL for backup, it is explained as regular
file, then when untar it, it is not ZAP formatted again, then
the Lustre cannot recognize the 'bad' formatted index object.

On the other hand, each backend FS has its own special format
for index object. Then we cannot migrate the index files from
one backend to another directly.

To resolve such issue, the patch will backup the index object
with plain format to the local '/index_backup' directory with
the name of source index's FID string and ".lbx" postfix when
umount the device.

The format of the backup is as following:
1) header: 512 bytes, including:
   magic:       4 bytes
   count:       4 bytes
   keysize:     4 bytes
   recsize:     4 bytes
   owner_fid:   16 bytes
   padding:     480 bytes

2) body: after the header, <key, rec> pairs one by one.

The backup will be done when server umount. The backup behavior
is controlled via new OSD lproc interface "index_backup". It is
off by default. You can turn it on to enable backup when server
umount via writing non-zero value to such lproc interface.

Test-Parameters: envdefinitions=SLOW=yes testlist=sanity-scrub mdtfilesystemtype=zfs ostfilesystemtype=zfs mdscount=2 mdtcount=4
Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I01730bc9cfa3ae597f2d8652df9fb76418cf55ce
Reviewed-on: https://review.whamcloud.com/30910
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
16 files changed:
lustre/include/lustre_fid.h
lustre/include/lustre_scrub.h
lustre/include/obd_support.h
lustre/include/uapi/linux/lustre/lustre_disk.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/mgs/mgs_fs.c
lustre/obdclass/scrub.c
lustre/ofd/ofd_dev.c
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_lproc.c
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osd-zfs/osd_scrub.c
lustre/tests/sanity-scrub.sh

index bd5d5d8..e798a12 100644 (file)
@@ -218,6 +218,7 @@ enum local_oid {
        OSD_LPF_OID             = 19UL,
        REPLY_DATA_OID          = 21UL,
        ACCT_PROJECT_OID        = 22UL,
        OSD_LPF_OID             = 19UL,
        REPLY_DATA_OID          = 21UL,
        ACCT_PROJECT_OID        = 22UL,
+       INDEX_BACKUP_OID        = 4116UL,
        OFD_LAST_GROUP_OID      = 4117UL,
        LLOG_CATALOGS_OID       = 4118UL,
        MGS_CONFIGS_OID         = 4119UL,
        OFD_LAST_GROUP_OID      = 4117UL,
        LLOG_CATALOGS_OID       = 4118UL,
        MGS_CONFIGS_OID         = 4119UL,
index eb066f3..055b248 100644 (file)
@@ -163,6 +163,7 @@ enum osd_lf_flags {
        OLF_SHOW_NAME           = 0x0004,
        OLF_NO_OI               = 0x0008,
        OLF_IDX_IN_FID          = 0x0010,
        OLF_SHOW_NAME           = 0x0004,
        OLF_NO_OI               = 0x0008,
        OLF_IDX_IN_FID          = 0x0010,
+       OLF_NOT_BACKUP          = 0x0020,
 };
 
 /* There are some overhead to detect OI inconsistency automatically
 };
 
 /* There are some overhead to detect OI inconsistency automatically
@@ -297,6 +298,42 @@ struct lustre_scrub {
                                os_full_scrub:1;
 };
 
                                os_full_scrub:1;
 };
 
+#define INDEX_BACKUP_MAGIC_V1  0x1E41F208
+#define INDEX_BACKUP_BUFSIZE   (4096 * 4)
+
+enum lustre_index_backup_policy {
+       /* By default, do not backup the index */
+       LIBP_NONE       = 0,
+
+       /* Backup the dirty index objects when umount */
+       LIBP_AUTO       = 1,
+};
+
+struct lustre_index_backup_header {
+       __u32           libh_magic;
+       __u32           libh_count;
+       __u32           libh_keysize;
+       __u32           libh_recsize;
+       struct lu_fid   libh_owner;
+       __u64           libh_pad[60]; /* keep header 512 bytes aligned */
+};
+
+struct lustre_index_backup_unit {
+       struct list_head        libu_link;
+       struct lu_fid           libu_fid;
+       __u32                   libu_keysize;
+       __u32                   libu_recsize;
+};
+
+struct lustre_index_restore_unit {
+       struct list_head        liru_link;
+       struct lu_fid           liru_pfid;
+       struct lu_fid           liru_cfid;
+       __u64                   liru_clid;
+       int                     liru_len;
+       char                    liru_name[0];
+};
+
 void scrub_file_init(struct lustre_scrub *scrub, __u8 *uuid);
 void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags);
 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub);
 void scrub_file_init(struct lustre_scrub *scrub, __u8 *uuid);
 void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags);
 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub);
@@ -307,6 +344,30 @@ int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
 void scrub_stop(struct lustre_scrub *scrub);
 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub);
 
 void scrub_stop(struct lustre_scrub *scrub);
 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub);
 
+int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
+                   const struct lu_fid *cfid, __u64 child,
+                   const char *name, int namelen);
+
+int lustre_index_register(struct dt_device *dev, const char *devname,
+                         struct list_head *head, spinlock_t *lock, int *guard,
+                         const struct lu_fid *fid,
+                         __u32 keysize, __u32 recsize);
+
+void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
+                        const char *devname, struct list_head *head,
+                        spinlock_t *lock, int *guard, bool backup);
+int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
+                        const struct lu_fid *parent_fid,
+                        const struct lu_fid *tgt_fid,
+                        const struct lu_fid *bak_fid, const char *name,
+                        struct list_head *head, spinlock_t *lock,
+                        char *buf, int bufsize);
+
+static inline void lustre_fid2lbx(char *buf, const struct lu_fid *fid, int len)
+{
+       snprintf(buf, len, DFID_NOBRACE".lbx", PFID(fid));
+}
+
 static inline const char *osd_scrub2name(struct lustre_scrub *scrub)
 {
        return scrub->os_name;
 static inline const char *osd_scrub2name(struct lustre_scrub *scrub)
 {
        return scrub->os_name;
index af090df..b309437 100644 (file)
@@ -273,6 +273,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_OSD_COMPAT_NO_ENTRY                   0x196
 #define OBD_FAIL_OSD_OST_EA_FID_SET                    0x197
 #define OBD_FAIL_OSD_NO_OI_ENTRY                       0x198
 #define OBD_FAIL_OSD_COMPAT_NO_ENTRY                   0x196
 #define OBD_FAIL_OSD_OST_EA_FID_SET                    0x197
 #define OBD_FAIL_OSD_NO_OI_ENTRY                       0x198
+#define OBD_FAIL_OSD_INDEX_CRASH                       0x199
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index 7df1c63..ab32e07 100644 (file)
@@ -68,6 +68,7 @@
 #define LFSCK_LAYOUT           "lfsck_layout"
 #define LFSCK_NAMESPACE                "lfsck_namespace"
 #define REMOTE_PARENT_DIR      "REMOTE_PARENT_DIR"
 #define LFSCK_LAYOUT           "lfsck_layout"
 #define LFSCK_NAMESPACE                "lfsck_namespace"
 #define REMOTE_PARENT_DIR      "REMOTE_PARENT_DIR"
+#define INDEX_BACKUP_DIR       "index_backup"
 
 /****************** persistent mount data *********************/
 
 
 /****************** persistent mount data *********************/
 
index d62a908..4aa1147 100644 (file)
@@ -216,6 +216,7 @@ enum lma_compat {
                                       * under /O/<seq>/d<x>. */
        LMAC_STRIPE_INFO = 0x00000010, /* stripe info in the LMA EA. */
        LMAC_COMP_INFO   = 0x00000020, /* Component info in the LMA EA. */
                                       * under /O/<seq>/d<x>. */
        LMAC_STRIPE_INFO = 0x00000010, /* stripe info in the LMA EA. */
        LMAC_COMP_INFO   = 0x00000020, /* Component info in the LMA EA. */
+       LMAC_IDX_BACKUP  = 0x00000040, /* Has index backup. */
 };
 
 /**
 };
 
 /**
index 4eefeda..0ccd4c4 100644 (file)
@@ -222,6 +222,26 @@ out:
 
 int mgs_fs_cleanup(const struct lu_env *env, struct mgs_device *mgs)
 {
 
 int mgs_fs_cleanup(const struct lu_env *env, struct mgs_device *mgs)
 {
+       struct lustre_cfg_bufs bufs;
+       struct lustre_cfg *lcfg;
+
+       /* For the MGS on independent device from MDT, it notifies the lower
+        * layer OSD to backup index before the umount via LCFG_PRE_CLEANUP. */
+       lustre_cfg_bufs_reset(&bufs, mgs->mgs_obd->obd_name);
+       lustre_cfg_bufs_set_string(&bufs, 1, NULL);
+       OBD_ALLOC(lcfg, lustre_cfg_len(bufs.lcfg_bufcount, bufs.lcfg_buflen));
+       if (!lcfg) {
+               CERROR("%s: failed to trigger LCFG_PRE_CLEANUP\n",
+                      mgs->mgs_obd->obd_name);
+       } else {
+               struct lu_device *l = &mgs->mgs_bottom->dd_lu_dev;
+
+               lustre_cfg_init(lcfg, LCFG_PRE_CLEANUP, &bufs);
+               l->ld_ops->ldo_process_config(env, l, lcfg);
+               OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount,
+                                             lcfg->lcfg_buflens));
+       }
+
        if (mgs->mgs_configs_dir) {
                dt_object_put(env, mgs->mgs_configs_dir);
                mgs->mgs_configs_dir = NULL;
        if (mgs->mgs_configs_dir) {
                dt_object_put(env, mgs->mgs_configs_dir);
                mgs->mgs_configs_dir = NULL;
index 7be6a27..3ab8540 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/kthread.h>
 #include <lustre_scrub.h>
 #include <lustre_lib.h>
 #include <linux/kthread.h>
 #include <lustre_scrub.h>
 #include <lustre_lib.h>
+#include <lustre_fid.h>
 
 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
 {
 
 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
 {
@@ -494,3 +495,715 @@ void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
        up_read(&scrub->os_rwsem);
 }
 EXPORT_SYMBOL(scrub_dump);
        up_read(&scrub->os_rwsem);
 }
 EXPORT_SYMBOL(scrub_dump);
+
+int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
+                   const struct lu_fid *cfid, __u64 child,
+                   const char *name, int namelen)
+{
+       struct lustre_index_restore_unit *liru;
+       int len = sizeof(*liru) + namelen + 1;
+
+       OBD_ALLOC(liru, len);
+       if (!liru)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&liru->liru_link);
+       liru->liru_pfid = *pfid;
+       liru->liru_cfid = *cfid;
+       liru->liru_clid = child;
+       liru->liru_len = len;
+       memcpy(liru->liru_name, name, namelen);
+       liru->liru_name[namelen] = 0;
+       list_add_tail(&liru->liru_link, head);
+
+       return 0;
+}
+EXPORT_SYMBOL(lustre_liru_new);
+
+int lustre_index_register(struct dt_device *dev, const char *devname,
+                         struct list_head *head, spinlock_t *lock, int *guard,
+                         const struct lu_fid *fid,
+                         __u32 keysize, __u32 recsize)
+{
+       struct lustre_index_backup_unit *libu, *pos;
+       int rc = 0;
+       ENTRY;
+
+       if (dev->dd_rdonly || *guard)
+               RETURN(1);
+
+       OBD_ALLOC_PTR(libu);
+       if (!libu)
+               RETURN(-ENOMEM);
+
+       INIT_LIST_HEAD(&libu->libu_link);
+       libu->libu_keysize = keysize;
+       libu->libu_recsize = recsize;
+       libu->libu_fid = *fid;
+
+       spin_lock(lock);
+       if (unlikely(*guard)) {
+               spin_unlock(lock);
+               OBD_FREE_PTR(libu);
+
+               RETURN(1);
+       }
+
+       list_for_each_entry_reverse(pos, head, libu_link) {
+               rc = lu_fid_cmp(&pos->libu_fid, fid);
+               if (rc < 0) {
+                       list_add(&libu->libu_link, &pos->libu_link);
+                       spin_unlock(lock);
+
+                       RETURN(0);
+               }
+
+               if (!rc) {
+                       /* Registered already. But the former registered one
+                        * has different keysize/recsize. It may because that
+                        * the former values are from disk and corrupted, then
+                        * replace it with new values. */
+                       if (unlikely(keysize != pos->libu_keysize ||
+                                    recsize != pos->libu_recsize)) {
+                               CWARN("%s: the index "DFID" has registered "
+                                     "with %u/%u, may be invalid, replace "
+                                     "with %u/%u\n",
+                                     devname, PFID(fid), pos->libu_keysize,
+                                     pos->libu_recsize, keysize, recsize);
+
+                               pos->libu_keysize = keysize;
+                               pos->libu_recsize = recsize;
+                       } else {
+                               rc = 1;
+                       }
+
+                       spin_unlock(lock);
+                       OBD_FREE_PTR(libu);
+
+                       RETURN(rc);
+               }
+       }
+
+       list_add(&libu->libu_link, head);
+       spin_unlock(lock);
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(lustre_index_register);
+
+static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
+                                 const struct lu_fid *fid)
+{
+       struct lustre_index_backup_unit *libu;
+       int rc = -ENOENT;
+
+       spin_lock(lock);
+       list_for_each_entry_reverse(libu, head, libu_link) {
+               rc = lu_fid_cmp(&libu->libu_fid, fid);
+               /* NOT registered. */
+               if (rc < 0)
+                       break;
+
+               if (!rc) {
+                       list_del(&libu->libu_link);
+                       break;
+               }
+       }
+       spin_unlock(lock);
+
+       if (!rc)
+               OBD_FREE_PTR(libu);
+}
+
+static void
+lustre_index_backup_make_header(struct lustre_index_backup_header *header,
+                               __u32 keysize, __u32 recsize,
+                               const struct lu_fid *fid, __u32 count)
+{
+       memset(header, 0, sizeof(*header));
+       header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
+       header->libh_count = cpu_to_le32(count);
+       header->libh_keysize = cpu_to_le32(keysize);
+       header->libh_recsize = cpu_to_le32(recsize);
+       fid_cpu_to_le(&header->libh_owner, fid);
+}
+
+static int lustre_index_backup_body(const struct lu_env *env,
+                                   struct dt_object *obj, loff_t *pos,
+                                   void *buf, int bufsize)
+{
+       struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+       struct thandle *th;
+       struct lu_buf lbuf = {
+               .lb_buf = buf,
+               .lb_len = bufsize
+       };
+       int rc;
+       ENTRY;
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_record_write(env, obj, &lbuf, pos, th);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
+static int lustre_index_backup_header(const struct lu_env *env,
+                                     struct dt_object *obj,
+                                     const struct lu_fid *tgt_fid,
+                                     __u32 keysize, __u32 recsize,
+                                     void *buf, int bufsize, int count)
+{
+       struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+       struct lustre_index_backup_header *header = buf;
+       struct lu_attr *la = buf;
+       struct thandle *th;
+       struct lu_buf lbuf = {
+               .lb_buf = header,
+               .lb_len = sizeof(*header)
+       };
+       loff_t size = sizeof(*header) + (keysize + recsize) * count;
+       loff_t pos = 0;
+       int rc;
+       bool punch = false;
+       ENTRY;
+
+       LASSERT(sizeof(*la) <= bufsize);
+       LASSERT(sizeof(*header) <= bufsize);
+
+       rc = dt_attr_get(env, obj, la);
+       if (rc)
+               RETURN(rc);
+
+       if (la->la_size > size)
+               punch = true;
+
+       lustre_index_backup_make_header(header, keysize, recsize,
+                                       tgt_fid, count);
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       if (punch) {
+               rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
+               if (rc)
+                       GOTO(stop, rc);
+       }
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_record_write(env, obj, &lbuf, &pos, th);
+       if (!rc && punch)
+               rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
+static int lustre_index_update_lma(const struct lu_env *env,
+                                  struct dt_object *obj,
+                                  void *buf, int bufsize)
+{
+       struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+       struct lustre_mdt_attrs *lma = buf;
+       struct lu_buf lbuf = {
+               .lb_buf = lma,
+               .lb_len = sizeof(struct lustre_ost_attrs)
+       };
+       struct thandle *th;
+       int fl = LU_XATTR_REPLACE;
+       int rc;
+       ENTRY;
+
+       LASSERT(bufsize >= lbuf.lb_len);
+
+       rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
+       if (unlikely(rc == -ENODATA)) {
+               fl = LU_XATTR_CREATE;
+               lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
+                               LMAC_IDX_BACKUP, 0);
+               rc = sizeof(*lma);
+       } else if (rc < sizeof(*lma)) {
+               RETURN(rc < 0 ? rc : -EFAULT);
+       } else {
+               lustre_lma_swab(lma);
+               if (lma->lma_compat & LMAC_IDX_BACKUP)
+                       RETURN(0);
+
+               lma->lma_compat |= LMAC_IDX_BACKUP;
+       }
+
+       lustre_lma_swab(lma);
+       lbuf.lb_len = rc;
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(rc);
+
+       rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
+static int lustre_index_backup_one(const struct lu_env *env,
+                                  struct local_oid_storage *los,
+                                  struct dt_object *parent,
+                                  struct lustre_index_backup_unit *libu,
+                                  char *buf, int bufsize)
+{
+       struct dt_device *dev = scrub_obj2dev(parent);
+       struct dt_object *tgt_obj = NULL;
+       struct dt_object *bak_obj = NULL;
+       const struct dt_it_ops *iops;
+       struct dt_it *di;
+       loff_t pos = sizeof(struct lustre_index_backup_header);
+       int count = 0;
+       int size = 0;
+       int rc;
+       ENTRY;
+
+       tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            &libu->libu_fid, NULL));
+       if (IS_ERR_OR_NULL(tgt_obj))
+               GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+       if (!dt_object_exists(tgt_obj))
+               GOTO(out, rc = 0);
+
+       if (!tgt_obj->do_index_ops) {
+               struct dt_index_features feat;
+
+               feat.dif_flags = DT_IND_UPDATE;
+               feat.dif_keysize_min = libu->libu_keysize;
+               feat.dif_keysize_max = libu->libu_keysize;
+               feat.dif_recsize_min = libu->libu_recsize;
+               feat.dif_recsize_max = libu->libu_recsize;
+               feat.dif_ptrsize = 4;
+               rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
+       bak_obj = local_file_find_or_create(env, los, parent, buf,
+                                           S_IFREG | S_IRUGO | S_IWUSR);
+       if (IS_ERR_OR_NULL(bak_obj))
+               GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
+
+       iops = &tgt_obj->do_index_ops->dio_it;
+       di = iops->init(env, tgt_obj, 0);
+       if (IS_ERR(di))
+               GOTO(out, rc = PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (!rc)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       while (!rc) {
+               void *key;
+               void *rec;
+
+               key = iops->key(env, di);
+               memcpy(&buf[size], key, libu->libu_keysize);
+               size += libu->libu_keysize;
+               rec = &buf[size];
+               rc = iops->rec(env, di, rec, 0);
+               if (rc)
+                       GOTO(fini, rc);
+
+               size += libu->libu_recsize;
+               count++;
+               if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
+                       rc = lustre_index_backup_body(env, bak_obj, &pos,
+                                                     buf, size);
+                       if (rc)
+                               GOTO(fini, rc);
+
+                       size = 0;
+               }
+
+               rc = iops->next(env, di);
+       }
+
+       if (rc >= 0 && size > 0)
+               rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
+
+       if (rc < 0)
+               GOTO(fini, rc);
+
+       rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
+                                       libu->libu_keysize, libu->libu_recsize,
+                                       buf, bufsize, count);
+       if (!rc)
+               rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
+
+       if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
+               LASSERT(bufsize >= 512);
+
+               pos = 0;
+               memset(buf, 0, 512);
+               lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
+       }
+
+       GOTO(fini, rc);
+
+fini:
+       iops->fini(env, di);
+out:
+       if (!IS_ERR_OR_NULL(tgt_obj))
+               dt_object_put_nocache(env, tgt_obj);
+       if (!IS_ERR_OR_NULL(bak_obj))
+               dt_object_put_nocache(env, bak_obj);
+       return rc;
+}
+
+void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
+                        const char *devname, struct list_head *head,
+                        spinlock_t *lock, int *guard, bool backup)
+{
+       struct lustre_index_backup_unit *libu;
+       struct local_oid_storage *los = NULL;
+       struct dt_object *parent = NULL;
+       char *buf = NULL;
+       struct lu_fid fid;
+       int rc;
+       ENTRY;
+
+       if (dev->dd_rdonly || *guard)
+               RETURN_EXIT;
+
+       spin_lock(lock);
+       *guard = 1;
+       spin_unlock(lock);
+
+       if (list_empty(head))
+               RETURN_EXIT;
+
+       /* Handle kinds of failures during mount process. */
+       if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
+               backup = false;
+
+       if (backup) {
+               OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+               if (!buf) {
+                       backup = false;
+                       goto scan;
+               }
+
+               lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
+               parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                                   &fid, NULL));
+               if (IS_ERR_OR_NULL(parent)) {
+                       CERROR("%s: failed to locate backup dir: rc = %ld\n",
+                              devname, parent ? PTR_ERR(parent) : -ENOENT);
+                       backup = false;
+                       goto scan;
+               }
+
+               lu_local_name_obj_fid(&fid, 1);
+               rc = local_oid_storage_init(env, dev, &fid, &los);
+               if (rc) {
+                       CERROR("%s: failed to init local storage: rc = %d\n",
+                              devname, rc);
+                       backup = false;
+               }
+       }
+
+scan:
+       spin_lock(lock);
+       while (!list_empty(head)) {
+               libu = list_entry(head->next,
+                                 struct lustre_index_backup_unit, libu_link);
+               list_del_init(&libu->libu_link);
+               spin_unlock(lock);
+
+               if (backup) {
+                       rc = lustre_index_backup_one(env, los, parent, libu,
+                                                    buf, INDEX_BACKUP_BUFSIZE);
+                       CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
+                              devname, PFID(&libu->libu_fid), rc);
+               }
+
+               OBD_FREE_PTR(libu);
+               spin_lock(lock);
+       }
+       spin_unlock(lock);
+
+       if (los)
+               local_oid_storage_fini(env, los);
+       if (parent)
+               dt_object_put_nocache(env, parent);
+       if (buf)
+               OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+
+       EXIT;
+}
+EXPORT_SYMBOL(lustre_index_backup);
+
+int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
+                        const struct lu_fid *parent_fid,
+                        const struct lu_fid *tgt_fid,
+                        const struct lu_fid *bak_fid, const char *name,
+                        struct list_head *head, spinlock_t *lock,
+                        char *buf, int bufsize)
+{
+       struct dt_object *parent_obj = NULL;
+       struct dt_object *tgt_obj = NULL;
+       struct dt_object *bak_obj = NULL;
+       struct lustre_index_backup_header *header;
+       struct dt_index_features *feat;
+       struct dt_object_format *dof;
+       struct lu_attr *la;
+       struct thandle *th;
+       struct lu_object_conf conf;
+       struct dt_insert_rec ent;
+       struct lu_buf lbuf;
+       struct lu_fid tfid;
+       loff_t pos = 0;
+       __u32 keysize;
+       __u32 recsize;
+       __u32 pairsize;
+       int count;
+       int rc;
+       bool registered = false;
+       ENTRY;
+
+       LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
+               sizeof(*feat) + sizeof(*header));
+
+       memset(buf, 0, bufsize);
+       la = (struct lu_attr *)buf;
+       dof = (void *)la + sizeof(*la);
+       feat = (void *)dof + sizeof(*dof);
+       header = (void *)feat + sizeof(*feat);
+       lbuf.lb_buf = header;
+       lbuf.lb_len = sizeof(*header);
+
+       tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            tgt_fid, NULL));
+       if (IS_ERR_OR_NULL(tgt_obj))
+               GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+       bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            bak_fid, NULL));
+       if (IS_ERR_OR_NULL(bak_obj))
+               GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
+
+       if (!dt_object_exists(bak_obj))
+               GOTO(out, rc = -ENOENT);
+
+       parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                               parent_fid, NULL));
+       if (IS_ERR_OR_NULL(parent_obj))
+               GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
+
+       LASSERT(dt_object_exists(parent_obj));
+
+       if (unlikely(!dt_try_as_dir(env, parent_obj)))
+               GOTO(out, rc = -ENOTDIR);
+
+       rc = dt_attr_get(env, tgt_obj, la);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = dt_record_read(env, bak_obj, &lbuf, &pos);
+       if (rc)
+               GOTO(out, rc);
+
+       if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
+               GOTO(out, rc = -EINVAL);
+
+       fid_le_to_cpu(&tfid, &header->libh_owner);
+       if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
+               GOTO(out, rc = -EINVAL);
+
+       keysize = le32_to_cpu(header->libh_keysize);
+       recsize = le32_to_cpu(header->libh_recsize);
+       pairsize = keysize + recsize;
+
+       memset(feat, 0, sizeof(*feat));
+       feat->dif_flags = DT_IND_UPDATE;
+       feat->dif_keysize_min = feat->dif_keysize_max = keysize;
+       feat->dif_recsize_min = feat->dif_recsize_max = recsize;
+       feat->dif_ptrsize = 4;
+
+       /* T1: remove old name entry and destroy old index. */
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       rc = dt_declare_delete(env, parent_obj,
+                              (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_declare_destroy(env, tgt_obj, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, tgt_obj, 0);
+       rc = dt_destroy(env, tgt_obj, th);
+       dt_write_unlock(env, tgt_obj);
+       dt_trans_stop(env, dev, th);
+       if (rc)
+               GOTO(out, rc);
+
+       la->la_valid = LA_MODE | LA_UID | LA_GID;
+       conf.loc_flags = LOC_F_NEW;
+       dof->u.dof_idx.di_feat = feat;
+       dof->dof_type = DFT_INDEX;
+       ent.rec_type = S_IFREG;
+       ent.rec_fid = tgt_fid;
+
+       /* Drop cache before re-create it. */
+       dt_object_put_nocache(env, tgt_obj);
+       tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            tgt_fid, &conf));
+       if (IS_ERR_OR_NULL(tgt_obj))
+               GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+       LASSERT(!dt_object_exists(tgt_obj));
+
+       /* T2: create new index and insert new name entry. */
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
+                              (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, tgt_obj, 0);
+       rc = dt_create(env, tgt_obj, la, NULL, dof, th);
+       dt_write_unlock(env, tgt_obj);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
+                      (const struct dt_key *)name, th, 1);
+       dt_trans_stop(env, dev, th);
+       /* Some index name may has been inserted by OSD
+        * automatically when create the index object. */
+       if (unlikely(rc == -EEXIST))
+               rc = 0;
+       if (rc)
+               GOTO(out, rc);
+
+       /* The new index will register via index_try. */
+       rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
+       if (rc)
+               GOTO(out, rc);
+
+       registered = true;
+       count = le32_to_cpu(header->libh_count);
+       while (!rc && count > 0) {
+               int size = pairsize * count;
+               int items = count;
+               int i;
+
+               if (size > bufsize) {
+                       items = bufsize / pairsize;
+                       size = pairsize * items;
+               }
+
+               lbuf.lb_buf = buf;
+               lbuf.lb_len = size;
+               rc = dt_record_read(env, bak_obj, &lbuf, &pos);
+               for (i = 0; i < items && !rc; i++) {
+                       void *key = &buf[i * pairsize];
+                       void *rec = &buf[i * pairsize + keysize];
+
+                       /* Tn: restore the records. */
+                       th = dt_trans_create(env, dev);
+                       if (!th)
+                               GOTO(out, rc = -ENOMEM);
+
+                       rc = dt_declare_insert(env, tgt_obj, rec, key, th);
+                       if (rc)
+                               GOTO(stop, rc);
+
+                       rc = dt_trans_start_local(env, dev, th);
+                       if (rc)
+                               GOTO(stop, rc);
+
+                       rc = dt_insert(env, tgt_obj, rec, key, th, 1);
+                       if (unlikely(rc == -EEXIST))
+                               rc = 0;
+
+                       dt_trans_stop(env, dev, th);
+               }
+
+               count -= items;
+       }
+
+       GOTO(out, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       if (rc && registered)
+               /* Degister the index to avoid overwriting the backup. */
+               lustre_index_degister(head, lock, tgt_fid);
+
+out:
+       if (!IS_ERR_OR_NULL(tgt_obj))
+               dt_object_put_nocache(env, tgt_obj);
+       if (!IS_ERR_OR_NULL(bak_obj))
+               dt_object_put_nocache(env, bak_obj);
+       if (!IS_ERR_OR_NULL(parent_obj))
+               dt_object_put_nocache(env, parent_obj);
+       return rc;
+}
+EXPORT_SYMBOL(lustre_index_restore);
index da8b008..9d05bc7 100644 (file)
@@ -245,6 +245,11 @@ static void ofd_stack_fini(const struct lu_env *env, struct ofd_device *m,
        top->ld_ops->ldo_process_config(env, top, lcfg);
        OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount, lcfg->lcfg_buflens));
 
        top->ld_ops->ldo_process_config(env, top, lcfg);
        OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount, lcfg->lcfg_buflens));
 
+       if (m->ofd_los != NULL) {
+               local_oid_storage_fini(env, m->ofd_los);
+               m->ofd_los = NULL;
+       }
+
        lu_site_purge(env, top->ld_site, ~0);
        if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
                LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
        lu_site_purge(env, top->ld_site, ~0);
        if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
                LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
@@ -257,6 +262,30 @@ static void ofd_stack_fini(const struct lu_env *env, struct ofd_device *m,
        EXIT;
 }
 
        EXIT;
 }
 
+static void ofd_stack_pre_fini(const struct lu_env *env, struct ofd_device *m,
+                              struct lu_device *top)
+{
+       struct lustre_cfg_bufs bufs;
+       struct lustre_cfg *lcfg;
+       ENTRY;
+
+       LASSERT(top);
+
+       lustre_cfg_bufs_reset(&bufs, ofd_name(m));
+       lustre_cfg_bufs_set_string(&bufs, 1, NULL);
+       OBD_ALLOC(lcfg, lustre_cfg_len(bufs.lcfg_bufcount, bufs.lcfg_buflen));
+       if (!lcfg) {
+               CERROR("%s: failed to trigger LCFG_PRE_CLEANUP\n", ofd_name(m));
+       } else {
+               lustre_cfg_init(lcfg, LCFG_PRE_CLEANUP, &bufs);
+               top->ld_ops->ldo_process_config(env, top, lcfg);
+               OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount,
+                                             lcfg->lcfg_buflens));
+       }
+
+       EXIT;
+}
+
 /* For interoperability, see mdt_interop_param[]. */
 static struct cfg_interop_param ofd_interop_param[] = {
        { "ost.quota_type",     NULL },
 /* For interoperability, see mdt_interop_param[]. */
 static struct cfg_interop_param ofd_interop_param[] = {
        { "ost.quota_type",     NULL },
@@ -3027,6 +3056,7 @@ static void ofd_fini(const struct lu_env *env, struct ofd_device *m)
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        lfsck_stop(env, m->ofd_osd, &stop);
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        lfsck_stop(env, m->ofd_osd, &stop);
+       ofd_stack_pre_fini(env, m, &m->ofd_dt_dev.dd_lu_dev);
        target_recovery_fini(obd);
        if (m->ofd_namespace != NULL)
                ldlm_namespace_free_prior(m->ofd_namespace, NULL,
        target_recovery_fini(obd);
        if (m->ofd_namespace != NULL)
                ldlm_namespace_free_prior(m->ofd_namespace, NULL,
@@ -3042,11 +3072,6 @@ static void ofd_fini(const struct lu_env *env, struct ofd_device *m)
        nm_config_file_deregister_tgt(env, obd->u.obt.obt_nodemap_config_file);
        obd->u.obt.obt_nodemap_config_file = NULL;
 
        nm_config_file_deregister_tgt(env, obd->u.obt.obt_nodemap_config_file);
        obd->u.obt.obt_nodemap_config_file = NULL;
 
-       if (m->ofd_los != NULL) {
-               local_oid_storage_fini(env, m->ofd_los);
-               m->ofd_los = NULL;
-       }
-
        if (m->ofd_namespace != NULL) {
                ldlm_namespace_free_post(m->ofd_namespace);
                d->ld_obd->obd_namespace = m->ofd_namespace = NULL;
        if (m->ofd_namespace != NULL) {
                ldlm_namespace_free_post(m->ofd_namespace);
                d->ld_obd->obd_namespace = m->ofd_namespace = NULL;
index b55d329..e2e9d31 100644 (file)
@@ -1058,6 +1058,7 @@ static int osd_mount(const struct lu_env *env,
        if (rc >= sizeof(o->od_svname))
                RETURN(-E2BIG);
 
        if (rc >= sizeof(o->od_svname))
                RETURN(-E2BIG);
 
+       o->od_index_backup_stop = 0;
        o->od_index = -1; /* -1 means index is invalid */
        rc = server_name2index(o->od_svname, &o->od_index, NULL);
        str = strstr(str, ":");
        o->od_index = -1; /* -1 means index is invalid */
        rc = server_name2index(o->od_svname, &o->od_index, NULL);
        str = strstr(str, ":");
@@ -1255,6 +1256,10 @@ static struct lu_device *osd_device_alloc(const struct lu_env *env,
        INIT_LIST_HEAD(&osl->osl_seq_list);
        rwlock_init(&osl->osl_seq_list_lock);
        sema_init(&osl->osl_seq_init_sem, 1);
        INIT_LIST_HEAD(&osl->osl_seq_list);
        rwlock_init(&osl->osl_seq_list_lock);
        sema_init(&osl->osl_seq_init_sem, 1);
+       INIT_LIST_HEAD(&dev->od_index_backup_list);
+       INIT_LIST_HEAD(&dev->od_index_restore_list);
+       spin_lock_init(&dev->od_lock);
+       dev->od_index_backup_policy = LIBP_NONE;
 
        rc = dt_device_init(&dev->od_dt_dev, type);
        if (rc == 0) {
 
        rc = dt_device_init(&dev->od_dt_dev, type);
        if (rc == 0) {
@@ -1349,6 +1354,9 @@ static int osd_process_config(const struct lu_env *env,
                rc = osd_mount(env, o, cfg);
                break;
        case LCFG_CLEANUP:
                rc = osd_mount(env, o, cfg);
                break;
        case LCFG_CLEANUP:
+               /* For the case LCFG_PRE_CLEANUP is not called in advance,
+                * that may happend if hit failure during mount process. */
+               osd_index_backup(env, o, false);
                rc = osd_shutdown(env, o);
                break;
        case LCFG_PARAM: {
                rc = osd_shutdown(env, o);
                break;
        case LCFG_PARAM: {
@@ -1364,6 +1372,11 @@ static int osd_process_config(const struct lu_env *env,
                }
                break;
        }
                }
                break;
        }
+       case LCFG_PRE_CLEANUP:
+               osd_index_backup(env, o,
+                                o->od_index_backup_policy != LIBP_NONE);
+               rc = 0;
+               break;
        default:
                rc = -ENOTTY;
        }
        default:
                rc = -ENOTTY;
        }
index 20f86f2..e069d6b 100644 (file)
 #include <sys/dsl_prop.h>
 #include <sys/sa_impl.h>
 #include <sys/txg.h>
 #include <sys/dsl_prop.h>
 #include <sys/sa_impl.h>
 #include <sys/txg.h>
-
-static inline int osd_object_is_zap(dnode_t *dn)
-{
-       return (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
-                       dn->dn_type == DMU_OT_USERGROUP_USED);
-}
+#include <lustre_scrub.h>
 
 /* We don't actually have direct access to the zap_hashbits() function
  * so just pretend like we do for now.  If this ever breaks we can look at
 
 /* We don't actually have direct access to the zap_hashbits() function
  * so just pretend like we do for now.  If this ever breaks we can look at
@@ -247,8 +242,8 @@ int __osd_xattr_load_by_oid(struct osd_device *osd, uint64_t oid, nvlist_t **sa)
  * \retval             0 for success
  * \retval             negative error number on failure
  */
  * \retval             0 for success
  * \retval             negative error number on failure
  */
-static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
-                             uint64_t oid, struct lu_fid *fid)
+int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+                      uint64_t oid, struct lu_fid *fid)
 {
        struct objset           *os       = osd->od_os;
        struct osd_thread_info  *oti      = osd_oti_get(env);
 {
        struct objset           *os       = osd->od_os;
        struct osd_thread_info  *oti      = osd_oti_get(env);
@@ -2012,6 +2007,8 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                const struct dt_index_features *feat)
 {
        struct osd_object *obj = osd_dt_obj(dt);
                const struct dt_index_features *feat)
 {
        struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_device *osd = osd_obj2dev(obj);
+       const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
        int rc = 0;
        ENTRY;
 
        int rc = 0;
        ENTRY;
 
@@ -2036,7 +2033,7 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                else
                        GOTO(out, rc = -ENOTDIR);
        } else if (unlikely(feat == &dt_acct_features)) {
                else
                        GOTO(out, rc = -ENOTDIR);
        } else if (unlikely(feat == &dt_acct_features)) {
-               LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
+               LASSERT(fid_is_acct(fid));
                dt->do_index_ops = &osd_acct_index_ops;
        } else if (dt->do_index_ops == NULL) {
                /* For index file, we don't support variable key & record sizes
                dt->do_index_ops = &osd_acct_index_ops;
        } else if (dt->do_index_ops == NULL) {
                /* For index file, we don't support variable key & record sizes
@@ -2067,6 +2064,24 @@ int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                        obj->oo_recusize = 8;
                }
                dt->do_index_ops = &osd_index_ops;
                        obj->oo_recusize = 8;
                }
                dt->do_index_ops = &osd_index_ops;
+
+               if (feat == &dt_lfsck_layout_orphan_features ||
+                   feat == &dt_lfsck_layout_dangling_features ||
+                   feat == &dt_lfsck_namespace_features)
+                       GOTO(out, rc = 0);
+
+               rc = osd_index_register(osd, fid, obj->oo_keysize,
+                                       obj->oo_recusize * obj->oo_recsize);
+               if (rc < 0)
+                       CWARN("%s: failed to register index "DFID": rc = %d\n",
+                             osd_name(osd), PFID(fid), rc);
+               else if (rc > 0)
+                       rc = 0;
+               else
+                       CDEBUG(D_LFSCK, "%s: index object "DFID
+                              " (%u/%u/%u) registered\n",
+                              osd_name(osd), PFID(fid), obj->oo_keysize,
+                              obj->oo_recusize, obj->oo_recsize);
        }
 
 out:
        }
 
 out:
index 37f24ea..5d423cc 100644 (file)
@@ -54,6 +54,7 @@
 #include <sys/zap.h>
 #include <sys/dbuf.h>
 #include <sys/dmu_objset.h>
 #include <sys/zap.h>
 #include <sys/dbuf.h>
 #include <sys/dmu_objset.h>
+#include <lustre_scrub.h>
 
 /**
  * By design including kmem.h overrides the Linux slab interfaces to provide
 
 /**
  * By design including kmem.h overrides the Linux slab interfaces to provide
@@ -241,6 +242,7 @@ struct osd_thread_info {
        struct lu_attr           oti_la;
        struct osa_attr          oti_osa;
        zap_attribute_t          oti_za;
        struct lu_attr           oti_la;
        struct osa_attr          oti_osa;
        zap_attribute_t          oti_za;
+       zap_attribute_t          oti_za2;
        dmu_object_info_t        oti_doi;
        struct luz_direntry      oti_zde;
 
        dmu_object_info_t        oti_doi;
        struct luz_direntry      oti_zde;
 
@@ -253,6 +255,7 @@ struct osd_thread_info {
        int                    oti_ins_cache_used;
        struct lu_buf          oti_xattr_lbuf;
        zap_cursor_t           oti_zc;
        int                    oti_ins_cache_used;
        struct lu_buf          oti_xattr_lbuf;
        zap_cursor_t           oti_zc;
+       zap_cursor_t           oti_zc2;
 };
 
 extern struct lu_context_key osd_key;
 };
 
 extern struct lu_context_key osd_key;
@@ -319,6 +322,7 @@ struct osd_device {
        struct lprocfs_stats    *od_stats;
 
        uint64_t                 od_remote_parent_dir;
        struct lprocfs_stats    *od_stats;
 
        uint64_t                 od_remote_parent_dir;
+       uint64_t                 od_index_backup_id;
        uint64_t                 od_max_blksz;
        uint64_t                 od_root;
        uint64_t                 od_O_id;
        uint64_t                 od_max_blksz;
        uint64_t                 od_root;
        uint64_t                 od_O_id;
@@ -333,7 +337,9 @@ struct osd_device {
                                 od_in_init:1,
                                 od_posix_acl:1;
        unsigned int             od_dnsize;
                                 od_in_init:1,
                                 od_posix_acl:1;
        unsigned int             od_dnsize;
+       int                      od_index_backup_stop;
 
 
+       enum lustre_index_backup_policy od_index_backup_policy;
        char                     od_mntdev[128];
        char                     od_svname[128];
        char                     od_uuid[16];
        char                     od_mntdev[128];
        char                     od_svname[128];
        char                     od_uuid[16];
@@ -372,6 +378,9 @@ struct osd_device {
        struct osd_otable_it    *od_otable_it;
        struct lustre_scrub      od_scrub;
        struct list_head         od_ios_list;
        struct osd_otable_it    *od_otable_it;
        struct lustre_scrub      od_scrub;
        struct list_head         od_ios_list;
+       struct list_head         od_index_backup_list;
+       struct list_head         od_index_restore_list;
+       spinlock_t               od_lock;
 };
 
 enum osd_destroy_type {
 };
 
 enum osd_destroy_type {
@@ -630,6 +639,8 @@ int osd_obj_find_or_create(const struct lu_env *env, struct osd_device *o,
 extern unsigned int osd_oi_count;
 
 /* osd_index.c */
 extern unsigned int osd_oi_count;
 
 /* osd_index.c */
+int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+                      uint64_t oid, struct lu_fid *fid);
 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                  const struct dt_index_features *feat);
 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                  const struct dt_index_features *feat);
 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
@@ -722,6 +733,12 @@ osd_find_dnsize(struct osd_device *osd, int ea_in_bonus)
 }
 #endif
 
 }
 #endif
 
+static inline int osd_object_is_zap(dnode_t *dn)
+{
+       return (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
+               dn->dn_type == DMU_OT_USERGROUP_USED);
+}
+
 /* XXX: f_ver is not counted, but may differ too */
 static inline void osd_fid2str(char *buf, const struct lu_fid *fid, int len)
 {
 /* XXX: f_ver is not counted, but may differ too */
 static inline void osd_fid2str(char *buf, const struct lu_fid *fid, int len)
 {
@@ -1042,4 +1059,31 @@ static inline int osd_dmu_read(struct osd_device *osd, dnode_t *dn,
        dmu_objset_disown((os), (tag))
 #endif
 
        dmu_objset_disown((os), (tag))
 #endif
 
+static inline int
+osd_index_register(struct osd_device *osd, const struct lu_fid *fid,
+                  __u32 keysize, __u32 recsize)
+{
+       return lustre_index_register(&osd->od_dt_dev, osd_name(osd),
+                                    &osd->od_index_backup_list, &osd->od_lock,
+                                    &osd->od_index_backup_stop,
+                                    fid, keysize, recsize);
+}
+
+static inline void
+osd_index_backup(const struct lu_env *env, struct osd_device *osd, bool backup)
+{
+       struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+       int rc;
+
+       lu_local_obj_fid(fid, INDEX_BACKUP_OID);
+       rc = osd_idc_find_and_init_with_oid(env, osd, fid,
+                                           osd->od_index_backup_id);
+       if (rc)
+               backup = false;
+
+       lustre_index_backup(env, &osd->od_dt_dev, osd_name(osd),
+                           &osd->od_index_backup_list, &osd->od_lock,
+                           &osd->od_index_backup_stop, backup);
+}
+
 #endif /* _OSD_INTERNAL_H */
 #endif /* _OSD_INTERNAL_H */
index 39b1848..8e426de 100644 (file)
@@ -216,7 +216,7 @@ static int zfs_osd_auto_scrub_seq_show(struct seq_file *m, void *data)
        struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
 
        LASSERT(dev != NULL);
        struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
 
        LASSERT(dev != NULL);
-       if (unlikely(!dev->od_os))
+       if (!dev->od_os)
                return -EINPROGRESS;
 
        seq_printf(m, "%lld\n", dev->od_auto_scrub_interval);
                return -EINPROGRESS;
 
        seq_printf(m, "%lld\n", dev->od_auto_scrub_interval);
@@ -234,7 +234,7 @@ zfs_osd_auto_scrub_seq_write(struct file *file, const char __user *buffer,
        __s64 val;
 
        LASSERT(dev != NULL);
        __s64 val;
 
        LASSERT(dev != NULL);
-       if (unlikely(!dev->od_os))
+       if (!dev->od_os)
                return -EINPROGRESS;
 
        rc = lprocfs_str_to_s64(buffer, count, &val);
                return -EINPROGRESS;
 
        rc = lprocfs_str_to_s64(buffer, count, &val);
@@ -251,7 +251,7 @@ static int zfs_osd_oi_scrub_seq_show(struct seq_file *m, void *data)
        struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
 
        LASSERT(dev != NULL);
        struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
 
        LASSERT(dev != NULL);
-       if (unlikely(!dev->od_os))
+       if (!dev->od_os)
                return -EINPROGRESS;
 
        scrub_dump(m, &dev->od_scrub);
                return -EINPROGRESS;
 
        scrub_dump(m, &dev->od_scrub);
@@ -295,6 +295,41 @@ lprocfs_osd_force_sync_seq_write(struct file *file, const char __user *buffer,
 }
 LPROC_SEQ_FOPS_WR_ONLY(zfs, osd_force_sync);
 
 }
 LPROC_SEQ_FOPS_WR_ONLY(zfs, osd_force_sync);
 
+static int zfs_osd_index_backup_seq_show(struct seq_file *m, void *data)
+{
+       struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
+
+       LASSERT(dev != NULL);
+       if (!dev->od_os)
+               return -EINPROGRESS;
+
+       seq_printf(m, "%d\n", dev->od_index_backup_policy);
+       return 0;
+}
+
+static ssize_t zfs_osd_index_backup_seq_write(struct file *file,
+                                             const char __user *buffer,
+                                             size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct dt_device *dt = m->private;
+       struct osd_device *dev = osd_dt_dev(dt);
+       __s64 val;
+       int rc;
+
+       LASSERT(dev != NULL);
+       if (!dev->od_os)
+               return -EINPROGRESS;
+
+       rc = lprocfs_str_to_s64(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       dev->od_index_backup_policy = val;
+       return count;
+}
+LPROC_SEQ_FOPS(zfs_osd_index_backup);
+
 LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_blksize);
 LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_kbytestotal);
 LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_kbytesfree);
 LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_blksize);
 LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_kbytestotal);
 LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_kbytesfree);
@@ -325,6 +360,8 @@ struct lprocfs_vars lprocfs_osd_obd_vars[] = {
          .fops =       &zfs_osd_mntdev_fops            },
        { .name =       "force_sync",
          .fops =       &zfs_osd_force_sync_fops        },
          .fops =       &zfs_osd_mntdev_fops            },
        { .name =       "force_sync",
          .fops =       &zfs_osd_force_sync_fops        },
+       { .name =       "index_backup",
+         .fops =       &zfs_osd_index_backup_fops      },
        { 0 }
 };
 
        { 0 }
 };
 
index ec4f52d..853ef27 100644 (file)
@@ -1937,20 +1937,18 @@ static int osd_create(const struct lu_env *env, struct dt_object *dt,
 
        zapid = osd_get_name_n_idx(env, osd, fid, buf,
                                   sizeof(info->oti_str), &zdn);
 
        zapid = osd_get_name_n_idx(env, osd, fid, buf,
                                   sizeof(info->oti_str), &zdn);
-       if (!CFS_FAIL_CHECK(OBD_FAIL_OSD_NO_OI_ENTRY)) {
-               if (osd->od_is_ost &&
-                   OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_INVALID_ENTRY))
-                       zde->zde_dnode++;
-
-               if (!osd->od_is_ost ||
-                   !OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_NO_ENTRY)) {
-                       rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1,
-                                        zde, oh->ot_tx);
-                       if (rc)
-                               GOTO(out, rc);
-               }
-       }
+       if (CFS_FAIL_CHECK(OBD_FAIL_OSD_NO_OI_ENTRY) ||
+           (osd->od_is_ost && OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_NO_ENTRY)))
+               goto skip_add;
+
+       if (osd->od_is_ost && OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_INVALID_ENTRY))
+               zde->zde_dnode++;
+
+       rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1, zde, oh->ot_tx);
+       if (rc)
+               GOTO(out, rc);
 
 
+skip_add:
        obj->oo_dn = dn;
        /* Now add in all of the "SA" attributes */
        rc = osd_sa_handle_get(obj);
        obj->oo_dn = dn;
        /* Now add in all of the "SA" attributes */
        rc = osd_sa_handle_get(obj);
index c8d1fcc..b499263 100644 (file)
@@ -752,7 +752,8 @@ static int osd_oi_probe(const struct lu_env *env, struct osd_device *o)
         * power of two and this is checked for basic sanity.
         */
        for (count = 0; count < max; count++) {
         * power of two and this is checked for basic sanity.
         */
        for (count = 0; count < max; count++) {
-               snprintf(name, 15, "%s.%d", DMU_OSD_OI_NAME_BASE, count);
+               snprintf(name, sizeof(name) - 1, "%s.%d",
+                        DMU_OSD_OI_NAME_BASE, count);
                rc = osd_oi_lookup(env, o, o->od_root, name, &oi);
                if (!rc)
                        continue;
                rc = osd_oi_lookup(env, o, o->od_root, name, &oi);
                if (!rc)
                        continue;
@@ -807,6 +808,20 @@ osd_oi_init_compat(const struct lu_env *env, struct osd_device *o)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+static int
+osd_oi_init_index_backup(const struct lu_env *env, struct osd_device *o)
+{
+       struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+       int rc;
+       ENTRY;
+
+       lu_local_obj_fid(fid, INDEX_BACKUP_OID);
+       rc = osd_obj_find_or_create(env, o, o->od_root, INDEX_BACKUP_DIR,
+                                   &o->od_index_backup_id, fid, true);
+
+       RETURN(rc);
+}
+
 static void
 osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o)
 {
 static void
 osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o)
 {
@@ -844,6 +859,10 @@ int osd_oi_init(const struct lu_env *env, struct osd_device *o)
        LASSERTF((sf->sf_oi_count & (sf->sf_oi_count - 1)) == 0,
                 "Invalid OI count in scrub file %d\n", sf->sf_oi_count);
 
        LASSERTF((sf->sf_oi_count & (sf->sf_oi_count - 1)) == 0,
                 "Invalid OI count in scrub file %d\n", sf->sf_oi_count);
 
+       rc = osd_oi_init_index_backup(env, o);
+       if (rc)
+               RETURN(rc);
+
        osd_oi_init_remote_parent(env, o);
 
        rc = osd_oi_init_compat(env, o);
        osd_oi_init_remote_parent(env, o);
 
        rc = osd_oi_init_compat(env, o);
index b2cda0a..dd0eddd 100644 (file)
@@ -46,6 +46,9 @@
 #include <obd_class.h>
 #include <lustre_nodemap.h>
 #include <sys/dsl_dataset.h>
 #include <obd_class.h>
 #include <lustre_nodemap.h>
 #include <sys/dsl_dataset.h>
+#include <sys/zap_impl.h>
+#include <sys/zap.h>
+#include <sys/zap_leaf.h>
 
 #include "osd_internal.h"
 
 
 #include "osd_internal.h"
 
@@ -749,7 +752,7 @@ static const struct osd_lf_map osd_lf_maps[] = {
        /* LFSCK */
        {
                .olm_name               = LFSCK_DIR,
        /* LFSCK */
        {
                .olm_name               = LFSCK_DIR,
-               .olm_flags              = OLF_SCAN_SUBITEMS,
+               .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
                .olm_scan_dir           = osd_ios_general_sd,
                .olm_handle_dirent      = osd_ios_varfid_hd,
        },
                .olm_scan_dir           = osd_ios_general_sd,
                .olm_handle_dirent      = osd_ios_varfid_hd,
        },
@@ -803,6 +806,18 @@ static const struct osd_lf_map osd_lf_maps[] = {
                .olm_name               = LUSTRE_NODEMAP_NAME,
        },
 
                .olm_name               = LUSTRE_NODEMAP_NAME,
        },
 
+       /* index_backup */
+       {
+               .olm_name               = INDEX_BACKUP_DIR,
+               .olm_fid                = {
+                       .f_seq  = FID_SEQ_LOCAL_FILE,
+                       .f_oid  = INDEX_BACKUP_OID,
+               },
+               .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
+               .olm_scan_dir           = osd_ios_general_sd,
+               .olm_handle_dirent      = osd_ios_varfid_hd,
+       },
+
        {
                .olm_name               = NULL
        }
        {
                .olm_name               = NULL
        }
@@ -864,6 +879,130 @@ static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
        return 0;
 }
 
        return 0;
 }
 
+static bool osd_index_need_recreate(const struct lu_env *env,
+                                   struct osd_device *dev, uint64_t oid)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+       zap_attribute_t *za = &info->oti_za2;
+       zap_cursor_t *zc = &info->oti_zc2;
+       int rc;
+       ENTRY;
+
+       zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
+       rc = -zap_cursor_retrieve(zc, za);
+       zap_cursor_fini(zc);
+       if (rc && rc != -ENOENT)
+               RETURN(true);
+
+       RETURN(false);
+}
+
+static void osd_ios_index_register(const struct lu_env *env,
+                                  struct osd_device *osd,
+                                  const struct lu_fid *fid, uint64_t oid)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+       zap_attribute_t *za = &info->oti_za2;
+       zap_cursor_t *zc = &info->oti_zc2;
+       struct zap_leaf_entry *le;
+       dnode_t *dn = NULL;
+       sa_handle_t *hdl;
+       __u64 mode = 0;
+       __u32 keysize = 0;
+       __u32 recsize = 0;
+       int rc;
+       ENTRY;
+
+       rc = __osd_obj2dnode(osd->od_os, oid, &dn);
+       if (rc == -EEXIST || rc == -ENOENT)
+               RETURN_EXIT;
+
+       if (rc < 0)
+               GOTO(log, rc);
+
+       if (!osd_object_is_zap(dn))
+               GOTO(log, rc = 1);
+
+       rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
+       if (rc)
+               GOTO(log, rc);
+
+       rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
+       sa_handle_destroy(hdl);
+       if (rc)
+               GOTO(log, rc);
+
+       if (!S_ISREG(mode))
+               GOTO(log, rc = 1);
+
+       zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
+       rc = -zap_cursor_retrieve(zc, za);
+       if (rc)
+               /* Skip empty index object */
+               GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
+
+       if (zc->zc_zap->zap_ismicro ||
+           !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
+               GOTO(fini, rc = 1);
+
+       le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
+       keysize = le->le_name_numints * 8;
+       recsize = za->za_integer_length * za->za_num_integers;
+       if (likely(keysize && recsize))
+               rc = osd_index_register(osd, fid, keysize, recsize);
+
+       GOTO(fini, rc);
+
+fini:
+       zap_cursor_fini(zc);
+
+log:
+       if (dn)
+               osd_dnode_rele(dn);
+       if (rc < 0)
+               CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
+                     osd_name(osd), PFID(fid), keysize, recsize, rc);
+       else if (!rc)
+               CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
+                      osd_name(osd), PFID(fid), keysize, recsize);
+}
+
+static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
+                             struct lustre_index_restore_unit *liru, void *buf,
+                             int bufsize)
+{
+       struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
+       struct lu_fid *tgt_fid = &liru->liru_cfid;
+       struct lu_fid bak_fid;
+       int rc;
+       ENTRY;
+
+       lustre_fid2lbx(buf, tgt_fid, bufsize);
+       rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
+                        sizeof(*zde) / 8, (void *)zde);
+       if (rc)
+               GOTO(log, rc);
+
+       rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
+       if (rc)
+               GOTO(log, rc);
+
+       /* The OI mapping for index may be invalid, since it will be
+        * re-created, not update the OI mapping, just cache it in RAM. */
+       rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
+                                           liru->liru_clid);
+       if (!rc)
+               rc = lustre_index_restore(env, &dev->od_dt_dev,
+                               &liru->liru_pfid, tgt_fid, &bak_fid,
+                               liru->liru_name, &dev->od_index_backup_list,
+                               &dev->od_lock, buf, bufsize);
+       GOTO(log, rc);
+
+log:
+       CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
+              osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
+}
+
 /**
  * verify FID-in-LMA and OI entry for one object
  *
 /**
  * verify FID-in-LMA and OI entry for one object
  *
@@ -912,7 +1051,31 @@ static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
                                RETURN(0);
                        }
 
                                RETURN(0);
                        }
 
+                       if (lma->lma_compat & LMAC_IDX_BACKUP &&
+                           osd_index_need_recreate(env, dev, oid)) {
+                               if (parent == dev->od_root) {
+                                       lu_local_obj_fid(&tfid,
+                                                        OSD_FS_ROOT_OID);
+                               } else {
+                                       rc = osd_get_fid_by_oid(env, dev,
+                                                               parent, &tfid);
+                                       if (rc) {
+                                               nvlist_free(nvbuf);
+                                               RETURN(rc);
+                                       }
+                               }
+
+                               rc = lustre_liru_new(
+                                               &dev->od_index_restore_list,
+                                               &tfid, &lma->lma_self_fid, oid,
+                                               name, strlen(name));
+                               nvlist_free(nvbuf);
+                               RETURN(rc);
+                       }
+
                        tfid = lma->lma_self_fid;
                        tfid = lma->lma_self_fid;
+                       if (!(flags & OLF_NOT_BACKUP))
+                               osd_ios_index_register(env, dev, &tfid, oid);
                }
                nvlist_free(nvbuf);
        }
                }
                nvlist_free(nvbuf);
        }
@@ -1176,6 +1339,31 @@ static void osd_initial_OI_scrub(const struct lu_env *env,
                OBD_FREE_PTR(item);
        }
 
                OBD_FREE_PTR(item);
        }
 
+       if (!list_empty(&dev->od_index_restore_list)) {
+               char *buf;
+
+               OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+               if (!buf)
+                       CERROR("%s: not enough RAM for rebuild index\n",
+                              osd_name(dev));
+
+               while (!list_empty(&dev->od_index_restore_list)) {
+                       struct lustre_index_restore_unit *liru;
+
+                       liru = list_entry(dev->od_index_restore_list.next,
+                                         struct lustre_index_restore_unit,
+                                         liru_link);
+                       list_del(&liru->liru_link);
+                       if (buf)
+                               osd_index_restore(env, dev, liru, buf,
+                                                 INDEX_BACKUP_BUFSIZE);
+                       OBD_FREE(liru, liru->liru_len);
+               }
+
+               if (buf)
+                       OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+       }
+
        EXIT;
 }
 
        EXIT;
 }
 
index 88fd417..6a14220 100644 (file)
@@ -331,6 +331,16 @@ full_scrub_threshold_rate() {
                osd-*.*.full_scrub_threshold_rate=$rate
 }
 
                osd-*.*.full_scrub_threshold_rate=$rate
 }
 
+scrub_enable_index_backup() {
+       do_nodes $(comma_list $(all_server_nodes)) $LCTL set_param -n \
+               osd-*.*.index_backup=1
+}
+
+scrub_disable_index_backup() {
+       do_nodes $(comma_list $(all_server_nodes)) $LCTL set_param -n \
+               osd-*.*.index_backup=0
+}
+
 test_0() {
        scrub_prep 0
        echo "starting MDTs without disabling OI scrub"
 test_0() {
        scrub_prep 0
        echo "starting MDTs without disabling OI scrub"
@@ -1216,6 +1226,26 @@ test_15() {
 }
 run_test 15 "Dryrun mode OI scrub"
 
 }
 run_test 15 "Dryrun mode OI scrub"
 
+test_16() {
+       [ $(facet_fstype $SINGLEMDS) != "zfs" ] &&
+               skip "only support zfs temporarily" && return
+
+       check_mount_and_prep
+       scrub_enable_index_backup
+
+       #define OBD_FAIL_OSD_INDEX_CRASH        0x199
+       do_nodes $(comma_list $(mdts_nodes)) $LCTL set_param fail_loc=0x199
+       scrub_prep 0
+       do_nodes $(comma_list $(mdts_nodes)) $LCTL set_param fail_loc=0
+
+       echo "starting MDTs without disabling OI scrub"
+       scrub_start_mds 1 "$MOUNT_OPTS_SCRUB"
+       mount_client $MOUNT || error "(2) Fail to start client!"
+       scrub_check_data 3
+       scrub_disable_index_backup
+}
+run_test 16 "Initial OI scrub can rebuild crashed index objects"
+
 # restore MDS/OST size
 MDSSIZE=${SAVED_MDSSIZE}
 OSTSIZE=${SAVED_OSTSIZE}
 # restore MDS/OST size
 MDSSIZE=${SAVED_MDSSIZE}
 OSTSIZE=${SAVED_OSTSIZE}