Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / obdclass / scrub.c
index 7be6a27..eee95c6 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/kthread.h>
 #include <lustre_scrub.h>
 #include <lustre_lib.h>
+#include <lustre_fid.h>
 
 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
 {
@@ -45,7 +46,7 @@ static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
 
 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
 {
-       memcpy(des->sf_uuid, src->sf_uuid, 16);
+       uuid_copy(&des->sf_uuid, &src->sf_uuid);
        des->sf_flags   = le64_to_cpu(src->sf_flags);
        des->sf_magic   = le32_to_cpu(src->sf_magic);
        des->sf_status  = le16_to_cpu(src->sf_status);
@@ -79,7 +80,7 @@ static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
 
 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
 {
-       memcpy(des->sf_uuid, src->sf_uuid, 16);
+       uuid_copy(&des->sf_uuid, &src->sf_uuid);
        des->sf_flags   = cpu_to_le64(src->sf_flags);
        des->sf_magic   = cpu_to_le32(src->sf_magic);
        des->sf_status  = cpu_to_le16(src->sf_status);
@@ -111,18 +112,18 @@ static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
        memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
 }
 
-void scrub_file_init(struct lustre_scrub *scrub, __u8 *uuid)
+void scrub_file_init(struct lustre_scrub *scrub, uuid_t uuid)
 {
        struct scrub_file *sf = &scrub->os_file;
 
        memset(sf, 0, sizeof(*sf));
-       memcpy(sf->sf_uuid, uuid, 16);
+       uuid_copy(&sf->sf_uuid, &uuid);
        sf->sf_magic = SCRUB_MAGIC_V1;
        sf->sf_status = SS_INIT;
 }
 EXPORT_SYMBOL(scrub_file_init);
 
-void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags)
+void scrub_file_reset(struct lustre_scrub *scrub, uuid_t uuid, u64 flags)
 {
        struct scrub_file *sf = &scrub->os_file;
 
@@ -130,7 +131,7 @@ void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags)
               "%#llx, add flags = %#llx\n",
               scrub->os_name, sf->sf_flags, flags);
 
-       memcpy(sf->sf_uuid, uuid, 16);
+       uuid_copy(&sf->sf_uuid, &uuid);
        sf->sf_status = SS_INIT;
        sf->sf_flags |= flags;
        sf->sf_flags &= ~SF_AUTO;
@@ -237,9 +238,9 @@ log:
                CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
                       scrub->os_name, rc);
 
-       scrub->os_time_last_checkpoint = cfs_time_current();
+       scrub->os_time_last_checkpoint = ktime_get_seconds();
        scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
-                               cfs_time_seconds(SCRUB_CHECKPOINT_INTERVAL);
+                                        SCRUB_CHECKPOINT_INTERVAL;
        return rc;
 }
 EXPORT_SYMBOL(scrub_file_store);
@@ -247,10 +248,10 @@ EXPORT_SYMBOL(scrub_file_store);
 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
 {
        struct scrub_file *sf = &scrub->os_file;
+       time64_t now = ktime_get_seconds();
        int rc;
 
-       if (likely(cfs_time_before(cfs_time_current(),
-                                  scrub->os_time_next_checkpoint) ||
+       if (likely(now < scrub->os_time_next_checkpoint ||
                   scrub->os_new_checked == 0))
                return 0;
 
@@ -261,9 +262,8 @@ int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
        sf->sf_items_checked += scrub->os_new_checked;
        scrub->os_new_checked = 0;
        sf->sf_pos_last_checkpoint = scrub->os_pos_current;
-       sf->sf_time_last_checkpoint = cfs_time_current_sec();
-       sf->sf_run_time += cfs_duration_sec(cfs_time_current() + HALF_SEC -
-                                           scrub->os_time_last_checkpoint);
+       sf->sf_time_last_checkpoint = ktime_get_real_seconds();
+       sf->sf_run_time += now - scrub->os_time_last_checkpoint;
        rc = scrub_file_store(env, scrub);
        up_write(&scrub->os_rwsem);
 
@@ -275,7 +275,6 @@ int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
                void *data, __u32 flags)
 {
        struct ptlrpc_thread *thread = &scrub->os_thread;
-       struct l_wait_info lwi = { 0 };
        struct task_struct *task;
        int rc;
        ENTRY;
@@ -290,9 +289,8 @@ again:
 
        if (unlikely(thread_is_stopping(thread))) {
                spin_unlock(&scrub->os_lock);
-               l_wait_event(thread->t_ctl_waitq,
-                            thread_is_stopped(thread),
-                            &lwi);
+               wait_event_idle(thread->t_ctl_waitq,
+                               thread_is_stopped(thread));
                goto again;
        }
        spin_unlock(&scrub->os_lock);
@@ -317,9 +315,8 @@ again:
                RETURN(rc);
        }
 
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_running(thread) || thread_is_stopped(thread),
-                    &lwi);
+       wait_event_idle(thread->t_ctl_waitq,
+                       thread_is_running(thread) || thread_is_stopped(thread));
 
        RETURN(0);
 }
@@ -328,7 +325,6 @@ EXPORT_SYMBOL(scrub_start);
 void scrub_stop(struct lustre_scrub *scrub)
 {
        struct ptlrpc_thread *thread = &scrub->os_thread;
-       struct l_wait_info lwi = { 0 };
 
        /* os_lock: sync status between stop and scrub thread */
        spin_lock(&scrub->os_lock);
@@ -336,9 +332,8 @@ void scrub_stop(struct lustre_scrub *scrub)
                thread_set_flags(thread, SVC_STOPPING);
                spin_unlock(&scrub->os_lock);
                wake_up_all(&thread->t_ctl_waitq);
-               l_wait_event(thread->t_ctl_waitq,
-                            thread_is_stopped(thread),
-                            &lwi);
+               wait_event_idle(thread->t_ctl_waitq,
+                               thread_is_stopped(thread));
                /* Do not skip the last lock/unlock, which can guarantee that
                 * the caller cannot return until the OI scrub thread exit. */
                spin_lock(&scrub->os_lock);
@@ -389,11 +384,12 @@ static void scrub_bits_dump(struct seq_file *m, int bits, const char *names[],
        }
 }
 
-static void scrub_time_dump(struct seq_file *m, __u64 time, const char *prefix)
+static void scrub_time_dump(struct seq_file *m, time64_t time,
+                           const char *prefix)
 {
        if (time != 0)
                seq_printf(m, "%s: %llu seconds\n", prefix,
-                          cfs_time_current_sec() - time);
+                          ktime_get_real_seconds() - time);
        else
                seq_printf(m, "%s: N/A\n", prefix);
 }
@@ -409,8 +405,8 @@ static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
 {
        struct scrub_file *sf = &scrub->os_file;
-       __u64 checked;
-       __u64 speed;
+       u64 checked;
+       s64 speed;
 
        down_read(&scrub->os_rwsem);
        seq_printf(m, "name: OI_scrub\n"
@@ -459,33 +455,40 @@ void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
 
        speed = checked;
        if (thread_is_running(&scrub->os_thread)) {
-               cfs_duration_t duration = cfs_time_current() -
-                                         scrub->os_time_last_checkpoint;
-               __u64 new_checked = msecs_to_jiffies(scrub->os_new_checked *
-                                                    MSEC_PER_SEC);
-               __u32 rtime = sf->sf_run_time +
-                             cfs_duration_sec(duration + HALF_SEC);
-
+               s64 new_checked = scrub->os_new_checked;
+               time64_t duration;
+               time64_t rtime;
+
+               /* Since the time resolution is in seconds for new system
+                * or small devices it ismore likely that duration will be
+                * zero which will lead to inaccurate results.
+                */
+               duration = ktime_get_seconds() -
+                          scrub->os_time_last_checkpoint;
                if (duration != 0)
-                       do_div(new_checked, duration);
+                       new_checked = div_s64(new_checked, duration);
+
+               rtime = sf->sf_run_time + duration;
                if (rtime != 0)
-                       do_div(speed, rtime);
-               seq_printf(m, "run_time: %u seconds\n"
-                          "average_speed: %llu objects/sec\n"
-                          "real-time_speed: %llu objects/sec\n"
+                       speed = div_s64(speed, rtime);
+
+               seq_printf(m, "run_time: %lld seconds\n"
+                          "average_speed: %lld objects/sec\n"
+                          "real-time_speed: %lld objects/sec\n"
                           "current_position: %llu\n"
                           "scrub_in_prior: %s\n"
                           "scrub_full_speed: %s\n"
                           "partial_scan: %s\n",
-                          rtime, speed, new_checked, scrub->os_pos_current,
+                          rtime, speed, new_checked,
+                          scrub->os_pos_current,
                           scrub->os_in_prior ? "yes" : "no",
                           scrub->os_full_speed ? "yes" : "no",
                           scrub->os_partial_scan ? "yes" : "no");
        } else {
                if (sf->sf_run_time != 0)
-                       do_div(speed, sf->sf_run_time);
-               seq_printf(m, "run_time: %u seconds\n"
-                          "average_speed: %llu objects/sec\n"
+                       speed = div_s64(speed, sf->sf_run_time);
+               seq_printf(m, "run_time: %ld seconds\n"
+                          "average_speed: %lld objects/sec\n"
                           "real-time_speed: N/A\n"
                           "current_position: N/A\n",
                           sf->sf_run_time, speed);
@@ -494,3 +497,715 @@ void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
        up_read(&scrub->os_rwsem);
 }
 EXPORT_SYMBOL(scrub_dump);
+
+int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
+                   const struct lu_fid *cfid, __u64 child,
+                   const char *name, int namelen)
+{
+       struct lustre_index_restore_unit *liru;
+       int len = sizeof(*liru) + namelen + 1;
+
+       OBD_ALLOC(liru, len);
+       if (!liru)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&liru->liru_link);
+       liru->liru_pfid = *pfid;
+       liru->liru_cfid = *cfid;
+       liru->liru_clid = child;
+       liru->liru_len = len;
+       memcpy(liru->liru_name, name, namelen);
+       liru->liru_name[namelen] = 0;
+       list_add_tail(&liru->liru_link, head);
+
+       return 0;
+}
+EXPORT_SYMBOL(lustre_liru_new);
+
+int lustre_index_register(struct dt_device *dev, const char *devname,
+                         struct list_head *head, spinlock_t *lock, int *guard,
+                         const struct lu_fid *fid,
+                         __u32 keysize, __u32 recsize)
+{
+       struct lustre_index_backup_unit *libu, *pos;
+       int rc = 0;
+       ENTRY;
+
+       if (dev->dd_rdonly || *guard)
+               RETURN(1);
+
+       OBD_ALLOC_PTR(libu);
+       if (!libu)
+               RETURN(-ENOMEM);
+
+       INIT_LIST_HEAD(&libu->libu_link);
+       libu->libu_keysize = keysize;
+       libu->libu_recsize = recsize;
+       libu->libu_fid = *fid;
+
+       spin_lock(lock);
+       if (unlikely(*guard)) {
+               spin_unlock(lock);
+               OBD_FREE_PTR(libu);
+
+               RETURN(1);
+       }
+
+       list_for_each_entry_reverse(pos, head, libu_link) {
+               rc = lu_fid_cmp(&pos->libu_fid, fid);
+               if (rc < 0) {
+                       list_add(&libu->libu_link, &pos->libu_link);
+                       spin_unlock(lock);
+
+                       RETURN(0);
+               }
+
+               if (!rc) {
+                       /* Registered already. But the former registered one
+                        * has different keysize/recsize. It may because that
+                        * the former values are from disk and corrupted, then
+                        * replace it with new values. */
+                       if (unlikely(keysize != pos->libu_keysize ||
+                                    recsize != pos->libu_recsize)) {
+                               CWARN("%s: the index "DFID" has registered "
+                                     "with %u/%u, may be invalid, replace "
+                                     "with %u/%u\n",
+                                     devname, PFID(fid), pos->libu_keysize,
+                                     pos->libu_recsize, keysize, recsize);
+
+                               pos->libu_keysize = keysize;
+                               pos->libu_recsize = recsize;
+                       } else {
+                               rc = 1;
+                       }
+
+                       spin_unlock(lock);
+                       OBD_FREE_PTR(libu);
+
+                       RETURN(rc);
+               }
+       }
+
+       list_add(&libu->libu_link, head);
+       spin_unlock(lock);
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(lustre_index_register);
+
+static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
+                                 const struct lu_fid *fid)
+{
+       struct lustre_index_backup_unit *libu;
+       int rc = -ENOENT;
+
+       spin_lock(lock);
+       list_for_each_entry_reverse(libu, head, libu_link) {
+               rc = lu_fid_cmp(&libu->libu_fid, fid);
+               /* NOT registered. */
+               if (rc < 0)
+                       break;
+
+               if (!rc) {
+                       list_del(&libu->libu_link);
+                       break;
+               }
+       }
+       spin_unlock(lock);
+
+       if (!rc)
+               OBD_FREE_PTR(libu);
+}
+
+static void
+lustre_index_backup_make_header(struct lustre_index_backup_header *header,
+                               __u32 keysize, __u32 recsize,
+                               const struct lu_fid *fid, __u32 count)
+{
+       memset(header, 0, sizeof(*header));
+       header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
+       header->libh_count = cpu_to_le32(count);
+       header->libh_keysize = cpu_to_le32(keysize);
+       header->libh_recsize = cpu_to_le32(recsize);
+       fid_cpu_to_le(&header->libh_owner, fid);
+}
+
+static int lustre_index_backup_body(const struct lu_env *env,
+                                   struct dt_object *obj, loff_t *pos,
+                                   void *buf, int bufsize)
+{
+       struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+       struct thandle *th;
+       struct lu_buf lbuf = {
+               .lb_buf = buf,
+               .lb_len = bufsize
+       };
+       int rc;
+       ENTRY;
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_record_write(env, obj, &lbuf, pos, th);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
+static int lustre_index_backup_header(const struct lu_env *env,
+                                     struct dt_object *obj,
+                                     const struct lu_fid *tgt_fid,
+                                     __u32 keysize, __u32 recsize,
+                                     void *buf, int bufsize, int count)
+{
+       struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+       struct lustre_index_backup_header *header = buf;
+       struct lu_attr *la = buf;
+       struct thandle *th;
+       struct lu_buf lbuf = {
+               .lb_buf = header,
+               .lb_len = sizeof(*header)
+       };
+       loff_t size = sizeof(*header) + (keysize + recsize) * count;
+       loff_t pos = 0;
+       int rc;
+       bool punch = false;
+       ENTRY;
+
+       LASSERT(sizeof(*la) <= bufsize);
+       LASSERT(sizeof(*header) <= bufsize);
+
+       rc = dt_attr_get(env, obj, la);
+       if (rc)
+               RETURN(rc);
+
+       if (la->la_size > size)
+               punch = true;
+
+       lustre_index_backup_make_header(header, keysize, recsize,
+                                       tgt_fid, count);
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       if (punch) {
+               rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
+               if (rc)
+                       GOTO(stop, rc);
+       }
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_record_write(env, obj, &lbuf, &pos, th);
+       if (!rc && punch)
+               rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
+static int lustre_index_update_lma(const struct lu_env *env,
+                                  struct dt_object *obj,
+                                  void *buf, int bufsize)
+{
+       struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+       struct lustre_mdt_attrs *lma = buf;
+       struct lu_buf lbuf = {
+               .lb_buf = lma,
+               .lb_len = sizeof(struct lustre_ost_attrs)
+       };
+       struct thandle *th;
+       int fl = LU_XATTR_REPLACE;
+       int rc;
+       ENTRY;
+
+       LASSERT(bufsize >= lbuf.lb_len);
+
+       rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
+       if (unlikely(rc == -ENODATA)) {
+               fl = LU_XATTR_CREATE;
+               lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
+                               LMAC_IDX_BACKUP, 0);
+               rc = sizeof(*lma);
+       } else if (rc < sizeof(*lma)) {
+               RETURN(rc < 0 ? rc : -EFAULT);
+       } else {
+               lustre_lma_swab(lma);
+               if (lma->lma_compat & LMAC_IDX_BACKUP)
+                       RETURN(0);
+
+               lma->lma_compat |= LMAC_IDX_BACKUP;
+       }
+
+       lustre_lma_swab(lma);
+       lbuf.lb_len = rc;
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               RETURN(rc);
+
+       rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
+
+       GOTO(stop, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       return rc;
+}
+
+static int lustre_index_backup_one(const struct lu_env *env,
+                                  struct local_oid_storage *los,
+                                  struct dt_object *parent,
+                                  struct lustre_index_backup_unit *libu,
+                                  char *buf, int bufsize)
+{
+       struct dt_device *dev = scrub_obj2dev(parent);
+       struct dt_object *tgt_obj = NULL;
+       struct dt_object *bak_obj = NULL;
+       const struct dt_it_ops *iops;
+       struct dt_it *di;
+       loff_t pos = sizeof(struct lustre_index_backup_header);
+       int count = 0;
+       int size = 0;
+       int rc;
+       ENTRY;
+
+       tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            &libu->libu_fid, NULL));
+       if (IS_ERR_OR_NULL(tgt_obj))
+               GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+       if (!dt_object_exists(tgt_obj))
+               GOTO(out, rc = 0);
+
+       if (!tgt_obj->do_index_ops) {
+               struct dt_index_features feat;
+
+               feat.dif_flags = DT_IND_UPDATE;
+               feat.dif_keysize_min = libu->libu_keysize;
+               feat.dif_keysize_max = libu->libu_keysize;
+               feat.dif_recsize_min = libu->libu_recsize;
+               feat.dif_recsize_max = libu->libu_recsize;
+               feat.dif_ptrsize = 4;
+               rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
+       bak_obj = local_file_find_or_create(env, los, parent, buf,
+                                           S_IFREG | S_IRUGO | S_IWUSR);
+       if (IS_ERR_OR_NULL(bak_obj))
+               GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
+
+       iops = &tgt_obj->do_index_ops->dio_it;
+       di = iops->init(env, tgt_obj, 0);
+       if (IS_ERR(di))
+               GOTO(out, rc = PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (!rc)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       while (!rc) {
+               void *key;
+               void *rec;
+
+               key = iops->key(env, di);
+               memcpy(&buf[size], key, libu->libu_keysize);
+               size += libu->libu_keysize;
+               rec = &buf[size];
+               rc = iops->rec(env, di, rec, 0);
+               if (rc)
+                       GOTO(fini, rc);
+
+               size += libu->libu_recsize;
+               count++;
+               if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
+                       rc = lustre_index_backup_body(env, bak_obj, &pos,
+                                                     buf, size);
+                       if (rc)
+                               GOTO(fini, rc);
+
+                       size = 0;
+               }
+
+               rc = iops->next(env, di);
+       }
+
+       if (rc >= 0 && size > 0)
+               rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
+
+       if (rc < 0)
+               GOTO(fini, rc);
+
+       rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
+                                       libu->libu_keysize, libu->libu_recsize,
+                                       buf, bufsize, count);
+       if (!rc)
+               rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
+
+       if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
+               LASSERT(bufsize >= 512);
+
+               pos = 0;
+               memset(buf, 0, 512);
+               lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
+       }
+
+       GOTO(fini, rc);
+
+fini:
+       iops->fini(env, di);
+out:
+       if (!IS_ERR_OR_NULL(tgt_obj))
+               dt_object_put_nocache(env, tgt_obj);
+       if (!IS_ERR_OR_NULL(bak_obj))
+               dt_object_put_nocache(env, bak_obj);
+       return rc;
+}
+
+void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
+                        const char *devname, struct list_head *head,
+                        spinlock_t *lock, int *guard, bool backup)
+{
+       struct lustre_index_backup_unit *libu;
+       struct local_oid_storage *los = NULL;
+       struct dt_object *parent = NULL;
+       char *buf = NULL;
+       struct lu_fid fid;
+       int rc;
+       ENTRY;
+
+       if (dev->dd_rdonly || *guard)
+               RETURN_EXIT;
+
+       spin_lock(lock);
+       *guard = 1;
+       spin_unlock(lock);
+
+       if (list_empty(head))
+               RETURN_EXIT;
+
+       /* Handle kinds of failures during mount process. */
+       if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
+               backup = false;
+
+       if (backup) {
+               OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+               if (!buf) {
+                       backup = false;
+                       goto scan;
+               }
+
+               lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
+               parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                                   &fid, NULL));
+               if (IS_ERR_OR_NULL(parent)) {
+                       CERROR("%s: failed to locate backup dir: rc = %ld\n",
+                              devname, parent ? PTR_ERR(parent) : -ENOENT);
+                       backup = false;
+                       goto scan;
+               }
+
+               lu_local_name_obj_fid(&fid, 1);
+               rc = local_oid_storage_init(env, dev, &fid, &los);
+               if (rc) {
+                       CERROR("%s: failed to init local storage: rc = %d\n",
+                              devname, rc);
+                       backup = false;
+               }
+       }
+
+scan:
+       spin_lock(lock);
+       while (!list_empty(head)) {
+               libu = list_entry(head->next,
+                                 struct lustre_index_backup_unit, libu_link);
+               list_del_init(&libu->libu_link);
+               spin_unlock(lock);
+
+               if (backup) {
+                       rc = lustre_index_backup_one(env, los, parent, libu,
+                                                    buf, INDEX_BACKUP_BUFSIZE);
+                       CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
+                              devname, PFID(&libu->libu_fid), rc);
+               }
+
+               OBD_FREE_PTR(libu);
+               spin_lock(lock);
+       }
+       spin_unlock(lock);
+
+       if (los)
+               local_oid_storage_fini(env, los);
+       if (parent)
+               dt_object_put_nocache(env, parent);
+       if (buf)
+               OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+
+       EXIT;
+}
+EXPORT_SYMBOL(lustre_index_backup);
+
+int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
+                        const struct lu_fid *parent_fid,
+                        const struct lu_fid *tgt_fid,
+                        const struct lu_fid *bak_fid, const char *name,
+                        struct list_head *head, spinlock_t *lock,
+                        char *buf, int bufsize)
+{
+       struct dt_object *parent_obj = NULL;
+       struct dt_object *tgt_obj = NULL;
+       struct dt_object *bak_obj = NULL;
+       struct lustre_index_backup_header *header;
+       struct dt_index_features *feat;
+       struct dt_object_format *dof;
+       struct lu_attr *la;
+       struct thandle *th;
+       struct lu_object_conf conf;
+       struct dt_insert_rec ent;
+       struct lu_buf lbuf;
+       struct lu_fid tfid;
+       loff_t pos = 0;
+       __u32 keysize;
+       __u32 recsize;
+       __u32 pairsize;
+       int count;
+       int rc;
+       bool registered = false;
+       ENTRY;
+
+       LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
+               sizeof(*feat) + sizeof(*header));
+
+       memset(buf, 0, bufsize);
+       la = (struct lu_attr *)buf;
+       dof = (void *)la + sizeof(*la);
+       feat = (void *)dof + sizeof(*dof);
+       header = (void *)feat + sizeof(*feat);
+       lbuf.lb_buf = header;
+       lbuf.lb_len = sizeof(*header);
+
+       tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            tgt_fid, NULL));
+       if (IS_ERR_OR_NULL(tgt_obj))
+               GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+       bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            bak_fid, NULL));
+       if (IS_ERR_OR_NULL(bak_obj))
+               GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
+
+       if (!dt_object_exists(bak_obj))
+               GOTO(out, rc = -ENOENT);
+
+       parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                               parent_fid, NULL));
+       if (IS_ERR_OR_NULL(parent_obj))
+               GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
+
+       LASSERT(dt_object_exists(parent_obj));
+
+       if (unlikely(!dt_try_as_dir(env, parent_obj)))
+               GOTO(out, rc = -ENOTDIR);
+
+       rc = dt_attr_get(env, tgt_obj, la);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = dt_record_read(env, bak_obj, &lbuf, &pos);
+       if (rc)
+               GOTO(out, rc);
+
+       if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
+               GOTO(out, rc = -EINVAL);
+
+       fid_le_to_cpu(&tfid, &header->libh_owner);
+       if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
+               GOTO(out, rc = -EINVAL);
+
+       keysize = le32_to_cpu(header->libh_keysize);
+       recsize = le32_to_cpu(header->libh_recsize);
+       pairsize = keysize + recsize;
+
+       memset(feat, 0, sizeof(*feat));
+       feat->dif_flags = DT_IND_UPDATE;
+       feat->dif_keysize_min = feat->dif_keysize_max = keysize;
+       feat->dif_recsize_min = feat->dif_recsize_max = recsize;
+       feat->dif_ptrsize = 4;
+
+       /* T1: remove old name entry and destroy old index. */
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       rc = dt_declare_delete(env, parent_obj,
+                              (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_declare_destroy(env, tgt_obj, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, tgt_obj, 0);
+       rc = dt_destroy(env, tgt_obj, th);
+       dt_write_unlock(env, tgt_obj);
+       dt_trans_stop(env, dev, th);
+       if (rc)
+               GOTO(out, rc);
+
+       la->la_valid = LA_MODE | LA_UID | LA_GID;
+       conf.loc_flags = LOC_F_NEW;
+       dof->u.dof_idx.di_feat = feat;
+       dof->dof_type = DFT_INDEX;
+       ent.rec_type = S_IFREG;
+       ent.rec_fid = tgt_fid;
+
+       /* Drop cache before re-create it. */
+       dt_object_put_nocache(env, tgt_obj);
+       tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+                                            tgt_fid, &conf));
+       if (IS_ERR_OR_NULL(tgt_obj))
+               GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+       LASSERT(!dt_object_exists(tgt_obj));
+
+       /* T2: create new index and insert new name entry. */
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
+                              (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, tgt_obj, 0);
+       rc = dt_create(env, tgt_obj, la, NULL, dof, th);
+       dt_write_unlock(env, tgt_obj);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
+                      (const struct dt_key *)name, th);
+       dt_trans_stop(env, dev, th);
+       /* Some index name may has been inserted by OSD
+        * automatically when create the index object. */
+       if (unlikely(rc == -EEXIST))
+               rc = 0;
+       if (rc)
+               GOTO(out, rc);
+
+       /* The new index will register via index_try. */
+       rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
+       if (rc)
+               GOTO(out, rc);
+
+       registered = true;
+       count = le32_to_cpu(header->libh_count);
+       while (!rc && count > 0) {
+               int size = pairsize * count;
+               int items = count;
+               int i;
+
+               if (size > bufsize) {
+                       items = bufsize / pairsize;
+                       size = pairsize * items;
+               }
+
+               lbuf.lb_buf = buf;
+               lbuf.lb_len = size;
+               rc = dt_record_read(env, bak_obj, &lbuf, &pos);
+               for (i = 0; i < items && !rc; i++) {
+                       void *key = &buf[i * pairsize];
+                       void *rec = &buf[i * pairsize + keysize];
+
+                       /* Tn: restore the records. */
+                       th = dt_trans_create(env, dev);
+                       if (!th)
+                               GOTO(out, rc = -ENOMEM);
+
+                       rc = dt_declare_insert(env, tgt_obj, rec, key, th);
+                       if (rc)
+                               GOTO(stop, rc);
+
+                       rc = dt_trans_start_local(env, dev, th);
+                       if (rc)
+                               GOTO(stop, rc);
+
+                       rc = dt_insert(env, tgt_obj, rec, key, th);
+                       if (unlikely(rc == -EEXIST))
+                               rc = 0;
+
+                       dt_trans_stop(env, dev, th);
+               }
+
+               count -= items;
+       }
+
+       GOTO(out, rc);
+
+stop:
+       dt_trans_stop(env, dev, th);
+       if (rc && registered)
+               /* Degister the index to avoid overwriting the backup. */
+               lustre_index_degister(head, lock, tgt_fid);
+
+out:
+       if (!IS_ERR_OR_NULL(tgt_obj))
+               dt_object_put_nocache(env, tgt_obj);
+       if (!IS_ERR_OR_NULL(bak_obj))
+               dt_object_put_nocache(env, bak_obj);
+       if (!IS_ERR_OR_NULL(parent_obj))
+               dt_object_put_nocache(env, parent_obj);
+       return rc;
+}
+EXPORT_SYMBOL(lustre_index_restore);