Whamcloud - gitweb
LU-15615 target: Free t10pi crypto state on error
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of_safe(obj->do_lu.lo_dev, struct dt_device,
45                                  dd_lu_dev);
46 }
47
48 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 {
50         guid_copy(&des->sf_uuid, &src->sf_uuid);
51         des->sf_flags   = le64_to_cpu(src->sf_flags);
52         des->sf_magic   = le32_to_cpu(src->sf_magic);
53         des->sf_status  = le16_to_cpu(src->sf_status);
54         des->sf_param   = le16_to_cpu(src->sf_param);
55         des->sf_time_last_complete      =
56                                 le64_to_cpu(src->sf_time_last_complete);
57         des->sf_time_latest_start       =
58                                 le64_to_cpu(src->sf_time_latest_start);
59         des->sf_time_last_checkpoint    =
60                                 le64_to_cpu(src->sf_time_last_checkpoint);
61         des->sf_pos_latest_start        =
62                                 le64_to_cpu(src->sf_pos_latest_start);
63         des->sf_pos_last_checkpoint     =
64                                 le64_to_cpu(src->sf_pos_last_checkpoint);
65         des->sf_pos_first_inconsistent  =
66                                 le64_to_cpu(src->sf_pos_first_inconsistent);
67         des->sf_items_checked           =
68                                 le64_to_cpu(src->sf_items_checked);
69         des->sf_items_updated           =
70                                 le64_to_cpu(src->sf_items_updated);
71         des->sf_items_failed            =
72                                 le64_to_cpu(src->sf_items_failed);
73         des->sf_items_updated_prior     =
74                                 le64_to_cpu(src->sf_items_updated_prior);
75         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
76         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
77         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
78         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
79         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
80 }
81
82 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 {
84         guid_copy(&des->sf_uuid, &src->sf_uuid);
85         des->sf_flags   = cpu_to_le64(src->sf_flags);
86         des->sf_magic   = cpu_to_le32(src->sf_magic);
87         des->sf_status  = cpu_to_le16(src->sf_status);
88         des->sf_param   = cpu_to_le16(src->sf_param);
89         des->sf_time_last_complete      =
90                                 cpu_to_le64(src->sf_time_last_complete);
91         des->sf_time_latest_start       =
92                                 cpu_to_le64(src->sf_time_latest_start);
93         des->sf_time_last_checkpoint    =
94                                 cpu_to_le64(src->sf_time_last_checkpoint);
95         des->sf_pos_latest_start        =
96                                 cpu_to_le64(src->sf_pos_latest_start);
97         des->sf_pos_last_checkpoint     =
98                                 cpu_to_le64(src->sf_pos_last_checkpoint);
99         des->sf_pos_first_inconsistent  =
100                                 cpu_to_le64(src->sf_pos_first_inconsistent);
101         des->sf_items_checked           =
102                                 cpu_to_le64(src->sf_items_checked);
103         des->sf_items_updated           =
104                                 cpu_to_le64(src->sf_items_updated);
105         des->sf_items_failed            =
106                                 cpu_to_le64(src->sf_items_failed);
107         des->sf_items_updated_prior     =
108                                 cpu_to_le64(src->sf_items_updated_prior);
109         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
110         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
111         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
112         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
113         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
114 }
115
116 void scrub_file_init(struct lustre_scrub *scrub, guid_t uuid)
117 {
118         struct scrub_file *sf = &scrub->os_file;
119
120         memset(sf, 0, sizeof(*sf));
121         guid_copy(&sf->sf_uuid, &uuid);
122         sf->sf_magic = SCRUB_MAGIC_V2;
123         sf->sf_status = SS_INIT;
124 }
125 EXPORT_SYMBOL(scrub_file_init);
126
127 void scrub_file_reset(struct lustre_scrub *scrub, guid_t uuid, u64 flags)
128 {
129         struct scrub_file *sf = &scrub->os_file;
130
131         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
132                "%#llx, add flags = %#llx\n",
133                scrub->os_name, sf->sf_flags, flags);
134
135         guid_copy(&sf->sf_uuid, &uuid);
136         sf->sf_magic = SCRUB_MAGIC_V2;
137         sf->sf_status = SS_INIT;
138         sf->sf_flags |= flags;
139         sf->sf_flags &= ~SF_AUTO;
140         sf->sf_run_time = 0;
141         sf->sf_time_latest_start = 0;
142         sf->sf_time_last_checkpoint = 0;
143         sf->sf_pos_latest_start = 0;
144         sf->sf_pos_last_checkpoint = 0;
145         sf->sf_pos_first_inconsistent = 0;
146         sf->sf_items_checked = 0;
147         sf->sf_items_updated = 0;
148         sf->sf_items_failed = 0;
149         sf->sf_items_noscrub = 0;
150         sf->sf_items_igif = 0;
151         if (!scrub->os_in_join)
152                 sf->sf_items_updated_prior = 0;
153 }
154 EXPORT_SYMBOL(scrub_file_reset);
155
156 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
157 {
158         struct scrub_file *sf = &scrub->os_file;
159         struct lu_buf buf = {
160                 .lb_buf = &scrub->os_file_disk,
161                 .lb_len = sizeof(scrub->os_file_disk)
162         };
163         loff_t pos = 0;
164         int rc;
165
166         rc = dt_read(env, scrub->os_obj, &buf, &pos);
167         /* failure */
168         if (rc < 0) {
169                 CERROR("%s: fail to load scrub file: rc = %d\n",
170                        scrub->os_name, rc);
171                 return rc;
172         }
173
174         /* empty */
175         if (!rc)
176                 return -ENOENT;
177
178         /* corrupted */
179         if (rc < buf.lb_len) {
180                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
181                        "expected = %d: rc = %d\n",
182                        scrub->os_name, (int)buf.lb_len, rc);
183                 return -EFAULT;
184         }
185
186         scrub_file_to_cpu(sf, &scrub->os_file_disk);
187         if (sf->sf_magic == SCRUB_MAGIC_V1) {
188                 CWARN("%s: reset scrub OI count for format change (LU-16655)\n",
189                       scrub->os_name);
190                 sf->sf_oi_count = 0;
191         } else if (sf->sf_magic != SCRUB_MAGIC_V2) {
192                 CDEBUG(D_LFSCK, "%s: invalid scrub magic %#x, should be %#x\n",
193                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V2);
194                 return -EFAULT;
195         }
196
197         return 0;
198 }
199 EXPORT_SYMBOL(scrub_file_load);
200
201 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
202 {
203         struct scrub_file *sf = &scrub->os_file_disk;
204         struct dt_object *obj = scrub->os_obj;
205         struct dt_device *dev = scrub_obj2dev(obj);
206         struct lu_buf buf = {
207                 .lb_buf = sf,
208                 .lb_len = sizeof(*sf)
209         };
210         struct thandle *th;
211         loff_t pos = 0;
212         int rc;
213         ENTRY;
214
215         /* Skip store under rdonly mode. */
216         if (dev->dd_rdonly)
217                 RETURN(0);
218
219         scrub_file_to_le(sf, &scrub->os_file);
220         th = dt_trans_create(env, dev);
221         if (IS_ERR(th))
222                 GOTO(log, rc = PTR_ERR(th));
223
224         rc = dt_declare_record_write(env, obj, &buf, pos, th);
225         if (rc)
226                 GOTO(stop, rc);
227
228         rc = dt_trans_start_local(env, dev, th);
229         if (rc)
230                 GOTO(stop, rc);
231
232         rc = dt_record_write(env, obj, &buf, &pos, th);
233
234         GOTO(stop, rc);
235
236 stop:
237         dt_trans_stop(env, dev, th);
238
239 log:
240         if (rc)
241                 CERROR("%s: store scrub file: rc = %d\n",
242                        scrub->os_name, rc);
243         else
244                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
245                        scrub->os_name, rc);
246
247         scrub->os_time_last_checkpoint = ktime_get_seconds();
248         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
249                                          SCRUB_CHECKPOINT_INTERVAL;
250         return rc;
251 }
252 EXPORT_SYMBOL(scrub_file_store);
253
254 bool scrub_needs_check(struct lustre_scrub *scrub, const struct lu_fid *fid,
255                        u64 index)
256 {
257         bool check = true;
258
259         if (!fid_is_norm(fid) && !fid_is_igif(fid))
260                 check = false;
261         else if (scrub->os_running && scrub->os_pos_current > index)
262                 check = false;
263         else if (scrub->os_auto_scrub_interval == AS_NEVER)
264                 check = false;
265         else if (ktime_get_real_seconds() <
266                  scrub->os_file.sf_time_last_complete +
267                  scrub->os_auto_scrub_interval)
268                 check = false;
269
270         return check;
271 }
272 EXPORT_SYMBOL(scrub_needs_check);
273
274 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
275 {
276         struct scrub_file *sf = &scrub->os_file;
277         time64_t now = ktime_get_seconds();
278         int rc;
279
280         if (likely(now < scrub->os_time_next_checkpoint ||
281                    scrub->os_new_checked == 0))
282                 return 0;
283
284         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
285                scrub->os_name, scrub->os_pos_current);
286
287         down_write(&scrub->os_rwsem);
288         sf->sf_items_checked += scrub->os_new_checked;
289         scrub->os_new_checked = 0;
290         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
291         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
292         sf->sf_run_time += now - scrub->os_time_last_checkpoint;
293         rc = scrub_file_store(env, scrub);
294         up_write(&scrub->os_rwsem);
295
296         return rc;
297 }
298 EXPORT_SYMBOL(scrub_checkpoint);
299
300 int scrub_thread_prep(const struct lu_env *env, struct lustre_scrub *scrub,
301                       guid_t uuid, u64 start)
302 {
303         struct scrub_file *sf = &scrub->os_file;
304         u32 flags = scrub->os_start_flags;
305         bool drop_dryrun = false;
306         int rc;
307
308         ENTRY;
309         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
310                scrub->os_name, flags);
311
312         down_write(&scrub->os_rwsem);
313         if (flags & SS_SET_FAILOUT)
314                 sf->sf_param |= SP_FAILOUT;
315         else if (flags & SS_CLEAR_FAILOUT)
316                 sf->sf_param &= ~SP_FAILOUT;
317
318         if (flags & SS_SET_DRYRUN) {
319                 sf->sf_param |= SP_DRYRUN;
320         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
321                 sf->sf_param &= ~SP_DRYRUN;
322                 drop_dryrun = true;
323         }
324
325         if (flags & SS_RESET)
326                 scrub_file_reset(scrub, uuid, 0);
327
328         spin_lock(&scrub->os_lock);
329         scrub->os_partial_scan = 0;
330         if (flags & SS_AUTO_FULL) {
331                 scrub->os_full_speed = 1;
332                 sf->sf_flags |= SF_AUTO;
333         } else if (flags & SS_AUTO_PARTIAL) {
334                 scrub->os_full_speed = 0;
335                 scrub->os_partial_scan = 1;
336                 sf->sf_flags |= SF_AUTO;
337         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
338                                    SF_UPGRADE)) {
339                 scrub->os_full_speed = 1;
340         } else {
341                 scrub->os_full_speed = 0;
342         }
343
344         scrub->os_in_prior = 0;
345         scrub->os_waiting = 0;
346         scrub->os_paused = 0;
347         scrub->os_in_join = 0;
348         scrub->os_full_scrub = 0;
349         spin_unlock(&scrub->os_lock);
350         scrub->os_new_checked = 0;
351         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
352                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
353         else if (sf->sf_pos_last_checkpoint != 0)
354                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
355         else
356                 sf->sf_pos_latest_start = start;
357
358         scrub->os_pos_current = sf->sf_pos_latest_start;
359         sf->sf_status = SS_SCANNING;
360         sf->sf_time_latest_start = ktime_get_real_seconds();
361         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
362         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
363         rc = scrub_file_store(env, scrub);
364         if (rc == 0) {
365                 spin_lock(&scrub->os_lock);
366                 scrub->os_running = 1;
367                 spin_unlock(&scrub->os_lock);
368                 wake_up_var(scrub);
369         }
370         up_write(&scrub->os_rwsem);
371
372         RETURN(rc);
373 }
374 EXPORT_SYMBOL(scrub_thread_prep);
375
376 int scrub_thread_post(const struct lu_env *env, struct lustre_scrub *scrub,
377                       int result)
378 {
379         struct scrub_file *sf = &scrub->os_file;
380         int rc;
381         ENTRY;
382
383         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
384                scrub->os_name, result);
385
386         down_write(&scrub->os_rwsem);
387         spin_lock(&scrub->os_lock);
388         scrub->os_running = 0;
389         spin_unlock(&scrub->os_lock);
390         if (scrub->os_new_checked > 0) {
391                 sf->sf_items_checked += scrub->os_new_checked;
392                 scrub->os_new_checked = 0;
393                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
394         }
395         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
396         if (result > 0) {
397                 sf->sf_status = SS_COMPLETED;
398                 if (!(sf->sf_param & SP_DRYRUN)) {
399                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
400                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
401                                           SF_UPGRADE | SF_AUTO);
402                 }
403                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
404                 sf->sf_success_count++;
405         } else if (result == 0) {
406                 if (scrub->os_paused)
407                         sf->sf_status = SS_PAUSED;
408                 else
409                         sf->sf_status = SS_STOPPED;
410         } else {
411                 sf->sf_status = SS_FAILED;
412         }
413         sf->sf_run_time += ktime_get_seconds() -
414                            scrub->os_time_last_checkpoint;
415
416         rc = scrub_file_store(env, scrub);
417         up_write(&scrub->os_rwsem);
418
419         RETURN(rc < 0 ? rc : result);
420 }
421 EXPORT_SYMBOL(scrub_thread_post);
422
423 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
424                 void *data, __u32 flags)
425 {
426         struct task_struct *task;
427         int rc;
428         ENTRY;
429
430         if (scrub->os_task)
431                 RETURN(-EALREADY);
432
433         if (scrub->os_file.sf_status == SS_COMPLETED) {
434                 if (!(flags & SS_SET_FAILOUT))
435                         flags |= SS_CLEAR_FAILOUT;
436
437                 if (!(flags & SS_SET_DRYRUN))
438                         flags |= SS_CLEAR_DRYRUN;
439
440                 flags |= SS_RESET;
441         }
442
443         task = kthread_create(threadfn, data, "OI_scrub");
444         if (IS_ERR(task)) {
445                 rc = PTR_ERR(task);
446                 CERROR("%s: cannot start iteration thread: rc = %d\n",
447                        scrub->os_name, rc);
448                 RETURN(rc);
449         }
450         spin_lock(&scrub->os_lock);
451         if (scrub->os_task) {
452                 /* Lost a race */
453                 spin_unlock(&scrub->os_lock);
454                 kthread_stop(task);
455                 RETURN(-EALREADY);
456         }
457         scrub->os_start_flags = flags;
458         scrub->os_task = task;
459         wake_up_process(task);
460         spin_unlock(&scrub->os_lock);
461         wait_var_event(scrub, scrub->os_running || !scrub->os_task);
462
463         RETURN(0);
464 }
465 EXPORT_SYMBOL(scrub_start);
466
467 void scrub_stop(struct lustre_scrub *scrub)
468 {
469         struct task_struct *task;
470
471         spin_lock(&scrub->os_lock);
472         scrub->os_running = 0;
473         spin_unlock(&scrub->os_lock);
474         task = xchg(&scrub->os_task, NULL);
475         if (task)
476                 kthread_stop(task);
477 }
478 EXPORT_SYMBOL(scrub_stop);
479
480 const char *const scrub_status_names[] = {
481         "init",
482         "scanning",
483         "completed",
484         "failed",
485         "stopped",
486         "paused",
487         "crashed",
488         NULL
489 };
490
491 const char *const scrub_flags_names[] = {
492         "recreated",
493         "inconsistent",
494         "auto",
495         "upgrade",
496         NULL
497 };
498
499 const char *const scrub_param_names[] = {
500         "failout",
501         "dryrun",
502         NULL
503 };
504
505 static void scrub_bits_dump(struct seq_file *m, int bits,
506                             const char *const names[],
507                             const char *prefix)
508 {
509         int flag;
510         int i;
511
512         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
513
514         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
515                 if (flag & bits) {
516                         bits &= ~flag;
517                         seq_printf(m, "%s%c", names[i],
518                                    bits != 0 ? ',' : '\n');
519                 }
520         }
521 }
522
523 static void scrub_time_dump(struct seq_file *m, time64_t time,
524                             const char *prefix)
525 {
526         if (time != 0)
527                 seq_printf(m, "%s: %llu seconds\n", prefix,
528                            ktime_get_real_seconds() - time);
529         else
530                 seq_printf(m, "%s: N/A\n", prefix);
531 }
532
533 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
534 {
535         if (pos != 0)
536                 seq_printf(m, "%s: %llu\n", prefix, pos);
537         else
538                 seq_printf(m, "%s: N/A\n", prefix);
539 }
540
541 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
542 {
543         struct scrub_file *sf = &scrub->os_file;
544         u64 checked;
545         s64 speed;
546
547         down_read(&scrub->os_rwsem);
548         seq_printf(m, "name: OI_scrub\n"
549                    "magic: 0x%x\n"
550                    "oi_files: %d\n"
551                    "status: %s\n",
552                    sf->sf_magic, (int)sf->sf_oi_count,
553                    scrub_status_names[sf->sf_status]);
554
555         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
556
557         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
558
559         scrub_time_dump(m, sf->sf_time_last_complete,
560                         "time_since_last_completed");
561
562         scrub_time_dump(m, sf->sf_time_latest_start,
563                         "time_since_latest_start");
564
565         scrub_time_dump(m, sf->sf_time_last_checkpoint,
566                         "time_since_last_checkpoint");
567
568         scrub_pos_dump(m, sf->sf_pos_latest_start,
569                         "latest_start_position");
570
571         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
572                         "last_checkpoint_position");
573
574         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
575                         "first_failure_position");
576
577         checked = sf->sf_items_checked + scrub->os_new_checked;
578         seq_printf(m, "checked: %llu\n"
579                    "%s: %llu\n"
580                    "failed: %llu\n"
581                    "prior_%s: %llu\n"
582                    "noscrub: %llu\n"
583                    "igif: %llu\n"
584                    "success_count: %u\n",
585                    checked,
586                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
587                    sf->sf_items_updated, sf->sf_items_failed,
588                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
589                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
590                    sf->sf_items_igif, sf->sf_success_count);
591
592         speed = checked;
593         if (scrub->os_running) {
594                 s64 new_checked = scrub->os_new_checked;
595                 time64_t duration;
596                 time64_t rtime;
597
598                 /* Since the time resolution is in seconds for new system
599                  * or small devices it ismore likely that duration will be
600                  * zero which will lead to inaccurate results.
601                  */
602                 duration = ktime_get_seconds() -
603                            scrub->os_time_last_checkpoint;
604                 if (duration != 0)
605                         new_checked = div_s64(new_checked, duration);
606
607                 rtime = sf->sf_run_time + duration;
608                 if (rtime != 0)
609                         speed = div_s64(speed, rtime);
610
611                 seq_printf(m, "run_time: %lld seconds\n"
612                            "average_speed: %lld objects/sec\n"
613                            "real_time_speed: %lld objects/sec\n"
614                            "current_position: %llu\n"
615                            "scrub_in_prior: %s\n"
616                            "scrub_full_speed: %s\n"
617                            "partial_scan: %s\n",
618                            rtime, speed, new_checked,
619                            scrub->os_pos_current,
620                            scrub->os_in_prior ? "yes" : "no",
621                            scrub->os_full_speed ? "yes" : "no",
622                            scrub->os_partial_scan ? "yes" : "no");
623         } else {
624                 if (sf->sf_run_time != 0)
625                         speed = div_s64(speed, sf->sf_run_time);
626                 seq_printf(m, "run_time: %d seconds\n"
627                            "average_speed: %lld objects/sec\n"
628                            "real_time_speed: N/A\n"
629                            "current_position: N/A\n",
630                            sf->sf_run_time, speed);
631         }
632
633         up_read(&scrub->os_rwsem);
634 }
635 EXPORT_SYMBOL(scrub_dump);
636
637 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
638                     const struct lu_fid *cfid, __u64 child,
639                     const char *name, int namelen)
640 {
641         struct lustre_index_restore_unit *liru;
642         int len = sizeof(*liru) + namelen + 1;
643
644         OBD_ALLOC(liru, len);
645         if (!liru)
646                 return -ENOMEM;
647
648         INIT_LIST_HEAD(&liru->liru_link);
649         liru->liru_pfid = *pfid;
650         liru->liru_cfid = *cfid;
651         liru->liru_clid = child;
652         liru->liru_len = len;
653         memcpy(liru->liru_name, name, namelen);
654         liru->liru_name[namelen] = 0;
655         list_add_tail(&liru->liru_link, head);
656
657         return 0;
658 }
659 EXPORT_SYMBOL(lustre_liru_new);
660
661 int lustre_index_register(struct dt_device *dev, const char *devname,
662                           struct list_head *head, spinlock_t *lock, int *guard,
663                           const struct lu_fid *fid,
664                           __u32 keysize, __u32 recsize)
665 {
666         struct lustre_index_backup_unit *libu, *pos;
667         int rc = 0;
668         ENTRY;
669
670         if (dev->dd_rdonly || *guard)
671                 RETURN(1);
672
673         OBD_ALLOC_PTR(libu);
674         if (!libu)
675                 RETURN(-ENOMEM);
676
677         INIT_LIST_HEAD(&libu->libu_link);
678         libu->libu_keysize = keysize;
679         libu->libu_recsize = recsize;
680         libu->libu_fid = *fid;
681
682         spin_lock(lock);
683         if (unlikely(*guard)) {
684                 spin_unlock(lock);
685                 OBD_FREE_PTR(libu);
686
687                 RETURN(1);
688         }
689
690         list_for_each_entry_reverse(pos, head, libu_link) {
691                 rc = lu_fid_cmp(&pos->libu_fid, fid);
692                 if (rc < 0) {
693                         list_add(&libu->libu_link, &pos->libu_link);
694                         spin_unlock(lock);
695
696                         RETURN(0);
697                 }
698
699                 if (!rc) {
700                         /* Registered already. But the former registered one
701                          * has different keysize/recsize. It may because that
702                          * the former values are from disk and corrupted, then
703                          * replace it with new values. */
704                         if (unlikely(keysize != pos->libu_keysize ||
705                                      recsize != pos->libu_recsize)) {
706                                 CWARN("%s: the index "DFID" has registered "
707                                       "with %u/%u, may be invalid, replace "
708                                       "with %u/%u\n",
709                                       devname, PFID(fid), pos->libu_keysize,
710                                       pos->libu_recsize, keysize, recsize);
711
712                                 pos->libu_keysize = keysize;
713                                 pos->libu_recsize = recsize;
714                         } else {
715                                 rc = 1;
716                         }
717
718                         spin_unlock(lock);
719                         OBD_FREE_PTR(libu);
720
721                         RETURN(rc);
722                 }
723         }
724
725         list_add(&libu->libu_link, head);
726         spin_unlock(lock);
727
728         RETURN(0);
729 }
730 EXPORT_SYMBOL(lustre_index_register);
731
732 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
733                                   const struct lu_fid *fid)
734 {
735         struct lustre_index_backup_unit *libu;
736         int rc = -ENOENT;
737
738         spin_lock(lock);
739         list_for_each_entry_reverse(libu, head, libu_link) {
740                 rc = lu_fid_cmp(&libu->libu_fid, fid);
741                 /* NOT registered. */
742                 if (rc < 0)
743                         break;
744
745                 if (!rc) {
746                         list_del(&libu->libu_link);
747                         break;
748                 }
749         }
750         spin_unlock(lock);
751
752         if (!rc)
753                 OBD_FREE_PTR(libu);
754 }
755
756 static void
757 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
758                                 __u32 keysize, __u32 recsize,
759                                 const struct lu_fid *fid, __u32 count)
760 {
761         memset(header, 0, sizeof(*header));
762         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
763         header->libh_count = cpu_to_le32(count);
764         header->libh_keysize = cpu_to_le32(keysize);
765         header->libh_recsize = cpu_to_le32(recsize);
766         fid_cpu_to_le(&header->libh_owner, fid);
767 }
768
769 static int lustre_index_backup_body(const struct lu_env *env,
770                                     struct dt_object *obj, loff_t *pos,
771                                     void *buf, int bufsize)
772 {
773         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
774         struct thandle *th;
775         struct lu_buf lbuf = {
776                 .lb_buf = buf,
777                 .lb_len = bufsize
778         };
779         int rc;
780         ENTRY;
781
782         th = dt_trans_create(env, dev);
783         if (IS_ERR(th))
784                 RETURN(PTR_ERR(th));
785
786         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
787         if (rc)
788                 GOTO(stop, rc);
789
790         rc = dt_trans_start_local(env, dev, th);
791         if (rc)
792                 GOTO(stop, rc);
793
794         rc = dt_record_write(env, obj, &lbuf, pos, th);
795
796         GOTO(stop, rc);
797
798 stop:
799         dt_trans_stop(env, dev, th);
800         return rc;
801 }
802
803 static int lustre_index_backup_header(const struct lu_env *env,
804                                       struct dt_object *obj,
805                                       const struct lu_fid *tgt_fid,
806                                       __u32 keysize, __u32 recsize,
807                                       void *buf, int bufsize, int count)
808 {
809         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
810         struct lustre_index_backup_header *header = buf;
811         struct lu_attr *la = buf;
812         struct thandle *th;
813         struct lu_buf lbuf = {
814                 .lb_buf = header,
815                 .lb_len = sizeof(*header)
816         };
817         loff_t size = sizeof(*header) + (keysize + recsize) * count;
818         loff_t pos = 0;
819         int rc;
820         bool punch = false;
821         ENTRY;
822
823         LASSERT(sizeof(*la) <= bufsize);
824         LASSERT(sizeof(*header) <= bufsize);
825
826         rc = dt_attr_get(env, obj, la);
827         if (rc)
828                 RETURN(rc);
829
830         if (la->la_size > size)
831                 punch = true;
832
833         lustre_index_backup_make_header(header, keysize, recsize,
834                                         tgt_fid, count);
835         th = dt_trans_create(env, dev);
836         if (IS_ERR(th))
837                 RETURN(PTR_ERR(th));
838
839         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
840         if (rc)
841                 GOTO(stop, rc);
842
843         if (punch) {
844                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
845                 if (rc)
846                         GOTO(stop, rc);
847         }
848
849         rc = dt_trans_start_local(env, dev, th);
850         if (rc)
851                 GOTO(stop, rc);
852
853         rc = dt_record_write(env, obj, &lbuf, &pos, th);
854         if (!rc && punch)
855                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
856
857         GOTO(stop, rc);
858
859 stop:
860         dt_trans_stop(env, dev, th);
861         return rc;
862 }
863
864 static int lustre_index_update_lma(const struct lu_env *env,
865                                    struct dt_object *obj,
866                                    void *buf, int bufsize)
867 {
868         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
869         struct lustre_mdt_attrs *lma = buf;
870         struct lu_buf lbuf = {
871                 .lb_buf = lma,
872                 .lb_len = sizeof(struct lustre_ost_attrs)
873         };
874         struct thandle *th;
875         int fl = LU_XATTR_REPLACE;
876         int rc;
877         ENTRY;
878
879         LASSERT(bufsize >= lbuf.lb_len);
880
881         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
882         if (unlikely(rc == -ENODATA)) {
883                 fl = LU_XATTR_CREATE;
884                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
885                                 LMAC_IDX_BACKUP, 0);
886                 rc = sizeof(*lma);
887         } else if (rc < sizeof(*lma)) {
888                 RETURN(rc < 0 ? rc : -EFAULT);
889         } else {
890                 lustre_lma_swab(lma);
891                 if (lma->lma_compat & LMAC_IDX_BACKUP)
892                         RETURN(0);
893
894                 lma->lma_compat |= LMAC_IDX_BACKUP;
895         }
896
897         lustre_lma_swab(lma);
898         lbuf.lb_len = rc;
899         th = dt_trans_create(env, dev);
900         if (IS_ERR(th))
901                 RETURN(rc);
902
903         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
904         if (rc)
905                 GOTO(stop, rc);
906
907         rc = dt_trans_start_local(env, dev, th);
908         if (rc)
909                 GOTO(stop, rc);
910
911         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
912
913         GOTO(stop, rc);
914
915 stop:
916         dt_trans_stop(env, dev, th);
917         return rc;
918 }
919
920 static int lustre_index_backup_one(const struct lu_env *env,
921                                    struct local_oid_storage *los,
922                                    struct dt_object *parent,
923                                    struct lustre_index_backup_unit *libu,
924                                    char *buf, int bufsize)
925 {
926         struct dt_device *dev = scrub_obj2dev(parent);
927         struct dt_object *tgt_obj = NULL;
928         struct dt_object *bak_obj = NULL;
929         const struct dt_it_ops *iops;
930         struct dt_it *di;
931         loff_t pos = sizeof(struct lustre_index_backup_header);
932         int count = 0;
933         int size = 0;
934         int rc;
935         ENTRY;
936
937         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
938                                              &libu->libu_fid, NULL));
939         if (IS_ERR_OR_NULL(tgt_obj))
940                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
941
942         if (!dt_object_exists(tgt_obj))
943                 GOTO(out, rc = 0);
944
945         if (!tgt_obj->do_index_ops) {
946                 struct dt_index_features feat;
947
948                 feat.dif_flags = DT_IND_UPDATE;
949                 feat.dif_keysize_min = libu->libu_keysize;
950                 feat.dif_keysize_max = libu->libu_keysize;
951                 feat.dif_recsize_min = libu->libu_recsize;
952                 feat.dif_recsize_max = libu->libu_recsize;
953                 feat.dif_ptrsize = 4;
954                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
955                 if (rc)
956                         GOTO(out, rc);
957         }
958
959         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
960         bak_obj = local_file_find_or_create(env, los, parent, buf,
961                                             S_IFREG | S_IRUGO | S_IWUSR);
962         if (IS_ERR_OR_NULL(bak_obj))
963                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
964
965         iops = &tgt_obj->do_index_ops->dio_it;
966         di = iops->init(env, tgt_obj, 0);
967         if (IS_ERR(di))
968                 GOTO(out, rc = PTR_ERR(di));
969
970         rc = iops->load(env, di, 0);
971         if (!rc)
972                 rc = iops->next(env, di);
973         else if (rc > 0)
974                 rc = 0;
975
976         while (!rc) {
977                 void *key;
978                 void *rec;
979
980                 key = iops->key(env, di);
981                 memcpy(&buf[size], key, libu->libu_keysize);
982                 size += libu->libu_keysize;
983                 rec = &buf[size];
984                 rc = iops->rec(env, di, rec, 0);
985                 if (rc)
986                         GOTO(fini, rc);
987
988                 size += libu->libu_recsize;
989                 count++;
990                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
991                         rc = lustre_index_backup_body(env, bak_obj, &pos,
992                                                       buf, size);
993                         if (rc)
994                                 GOTO(fini, rc);
995
996                         size = 0;
997                 }
998
999                 rc = iops->next(env, di);
1000         }
1001
1002         if (rc >= 0 && size > 0)
1003                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
1004
1005         if (rc < 0)
1006                 GOTO(fini, rc);
1007
1008         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
1009                                         libu->libu_keysize, libu->libu_recsize,
1010                                         buf, bufsize, count);
1011         if (!rc)
1012                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
1013
1014         if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
1015                 LASSERT(bufsize >= 512);
1016
1017                 pos = 0;
1018                 memset(buf, 0, 512);
1019                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
1020         }
1021
1022         GOTO(fini, rc);
1023
1024 fini:
1025         iops->fini(env, di);
1026 out:
1027         if (!IS_ERR_OR_NULL(tgt_obj))
1028                 dt_object_put_nocache(env, tgt_obj);
1029         if (!IS_ERR_OR_NULL(bak_obj))
1030                 dt_object_put_nocache(env, bak_obj);
1031         return rc;
1032 }
1033
1034 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
1035                          const char *devname, struct list_head *head,
1036                          spinlock_t *lock, int *guard, bool backup)
1037 {
1038         struct lustre_index_backup_unit *libu;
1039         struct local_oid_storage *los = NULL;
1040         struct dt_object *parent = NULL;
1041         char *buf = NULL;
1042         struct lu_fid fid;
1043         int rc;
1044         ENTRY;
1045
1046         if (dev->dd_rdonly || *guard)
1047                 RETURN_EXIT;
1048
1049         spin_lock(lock);
1050         *guard = 1;
1051         spin_unlock(lock);
1052
1053         if (list_empty(head))
1054                 RETURN_EXIT;
1055
1056         /* Handle kinds of failures during mount process. */
1057         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
1058                 backup = false;
1059
1060         if (backup) {
1061                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1062                 if (!buf) {
1063                         backup = false;
1064                         goto scan;
1065                 }
1066
1067                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
1068                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1069                                                     &fid, NULL));
1070                 if (IS_ERR_OR_NULL(parent)) {
1071                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
1072                                devname, parent ? PTR_ERR(parent) : -ENOENT);
1073                         backup = false;
1074                         goto scan;
1075                 }
1076
1077                 lu_local_name_obj_fid(&fid, 1);
1078                 rc = local_oid_storage_init(env, dev, &fid, &los);
1079                 if (rc) {
1080                         CERROR("%s: failed to init local storage: rc = %d\n",
1081                                devname, rc);
1082                         backup = false;
1083                 }
1084         }
1085
1086 scan:
1087         spin_lock(lock);
1088         while (!list_empty(head)) {
1089                 libu = list_entry(head->next,
1090                                   struct lustre_index_backup_unit, libu_link);
1091                 list_del_init(&libu->libu_link);
1092                 spin_unlock(lock);
1093
1094                 if (backup) {
1095                         rc = lustre_index_backup_one(env, los, parent, libu,
1096                                                      buf, INDEX_BACKUP_BUFSIZE);
1097                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
1098                                devname, PFID(&libu->libu_fid), rc);
1099                 }
1100
1101                 OBD_FREE_PTR(libu);
1102                 spin_lock(lock);
1103         }
1104         spin_unlock(lock);
1105
1106         if (los)
1107                 local_oid_storage_fini(env, los);
1108         if (parent)
1109                 dt_object_put_nocache(env, parent);
1110         if (buf)
1111                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1112
1113         EXIT;
1114 }
1115 EXPORT_SYMBOL(lustre_index_backup);
1116
1117 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
1118                          const struct lu_fid *parent_fid,
1119                          const struct lu_fid *tgt_fid,
1120                          const struct lu_fid *bak_fid, const char *name,
1121                          struct list_head *head, spinlock_t *lock,
1122                          char *buf, int bufsize)
1123 {
1124         struct dt_object *parent_obj = NULL;
1125         struct dt_object *tgt_obj = NULL;
1126         struct dt_object *bak_obj = NULL;
1127         struct lustre_index_backup_header *header;
1128         struct dt_index_features *feat;
1129         struct dt_object_format *dof;
1130         struct lu_attr *la;
1131         struct thandle *th;
1132         struct lu_object_conf conf;
1133         struct dt_insert_rec ent;
1134         struct lu_buf lbuf;
1135         struct lu_fid tfid;
1136         loff_t pos = 0;
1137         __u32 keysize;
1138         __u32 recsize;
1139         __u32 pairsize;
1140         int count;
1141         int rc;
1142         bool registered = false;
1143         ENTRY;
1144
1145         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1146                 sizeof(*feat) + sizeof(*header));
1147
1148         memset(buf, 0, bufsize);
1149         la = (struct lu_attr *)buf;
1150         dof = (void *)la + sizeof(*la);
1151         feat = (void *)dof + sizeof(*dof);
1152         header = (void *)feat + sizeof(*feat);
1153         lbuf.lb_buf = header;
1154         lbuf.lb_len = sizeof(*header);
1155
1156         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1157                                              tgt_fid, NULL));
1158         if (IS_ERR_OR_NULL(tgt_obj))
1159                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1160
1161         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1162                                              bak_fid, NULL));
1163         if (IS_ERR_OR_NULL(bak_obj))
1164                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1165
1166         if (!dt_object_exists(bak_obj))
1167                 GOTO(out, rc = -ENOENT);
1168
1169         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1170                                                 parent_fid, NULL));
1171         if (IS_ERR_OR_NULL(parent_obj))
1172                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1173
1174         LASSERT(dt_object_exists(parent_obj));
1175
1176         if (unlikely(!dt_try_as_dir(env, parent_obj, true)))
1177                 GOTO(out, rc = -ENOTDIR);
1178
1179         rc = dt_attr_get(env, tgt_obj, la);
1180         if (rc)
1181                 GOTO(out, rc);
1182
1183         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1184         if (rc)
1185                 GOTO(out, rc);
1186
1187         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1188                 GOTO(out, rc = -EINVAL);
1189
1190         fid_le_to_cpu(&tfid, &header->libh_owner);
1191         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1192                 GOTO(out, rc = -EINVAL);
1193
1194         keysize = le32_to_cpu(header->libh_keysize);
1195         recsize = le32_to_cpu(header->libh_recsize);
1196         pairsize = keysize + recsize;
1197
1198         memset(feat, 0, sizeof(*feat));
1199         feat->dif_flags = DT_IND_UPDATE;
1200         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1201         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1202         feat->dif_ptrsize = 4;
1203
1204         /* T1: remove old name entry and destroy old index. */
1205         th = dt_trans_create(env, dev);
1206         if (IS_ERR(th))
1207                 GOTO(out, rc = PTR_ERR(th));
1208
1209         rc = dt_declare_delete(env, parent_obj,
1210                                (const struct dt_key *)name, th);
1211         if (rc)
1212                 GOTO(stop, rc);
1213
1214         rc = dt_declare_ref_del(env, tgt_obj, th);
1215         if (rc)
1216                 GOTO(stop, rc);
1217
1218         rc = dt_declare_destroy(env, tgt_obj, th);
1219         if (rc)
1220                 GOTO(stop, rc);
1221
1222         rc = dt_trans_start_local(env, dev, th);
1223         if (rc)
1224                 GOTO(stop, rc);
1225
1226         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1227         if (rc)
1228                 GOTO(stop, rc);
1229
1230         dt_write_lock(env, tgt_obj, 0);
1231         rc = dt_ref_del(env, tgt_obj, th);
1232         if (rc == 0) {
1233                 if (S_ISDIR(tgt_obj->do_lu.lo_header->loh_attr))
1234                         dt_ref_del(env, tgt_obj, th);
1235                 rc = dt_destroy(env, tgt_obj, th);
1236         }
1237         dt_write_unlock(env, tgt_obj);
1238         dt_trans_stop(env, dev, th);
1239         if (rc)
1240                 GOTO(out, rc);
1241
1242         la->la_valid = LA_MODE | LA_UID | LA_GID;
1243         conf.loc_flags = LOC_F_NEW;
1244         dof->u.dof_idx.di_feat = feat;
1245         dof->dof_type = DFT_INDEX;
1246         ent.rec_type = S_IFREG;
1247         ent.rec_fid = tgt_fid;
1248
1249         /* Drop cache before re-create it. */
1250         dt_object_put_nocache(env, tgt_obj);
1251         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1252                                              tgt_fid, &conf));
1253         if (IS_ERR_OR_NULL(tgt_obj))
1254                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1255
1256         LASSERT(!dt_object_exists(tgt_obj));
1257
1258         /* T2: create new index and insert new name entry. */
1259         th = dt_trans_create(env, dev);
1260         if (IS_ERR(th))
1261                 GOTO(out, rc = PTR_ERR(th));
1262
1263         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1264         if (rc)
1265                 GOTO(stop, rc);
1266
1267         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1268                                (const struct dt_key *)name, th);
1269         if (rc)
1270                 GOTO(stop, rc);
1271
1272         rc = dt_trans_start_local(env, dev, th);
1273         if (rc)
1274                 GOTO(stop, rc);
1275
1276         dt_write_lock(env, tgt_obj, 0);
1277         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1278         dt_write_unlock(env, tgt_obj);
1279         if (rc)
1280                 GOTO(stop, rc);
1281
1282         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1283                        (const struct dt_key *)name, th);
1284         dt_trans_stop(env, dev, th);
1285         /* Some index name may has been inserted by OSD
1286          * automatically when create the index object. */
1287         if (unlikely(rc == -EEXIST))
1288                 rc = 0;
1289         if (rc)
1290                 GOTO(out, rc);
1291
1292         /* The new index will register via index_try. */
1293         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1294         if (rc)
1295                 GOTO(out, rc);
1296
1297         registered = true;
1298         count = le32_to_cpu(header->libh_count);
1299         while (!rc && count > 0) {
1300                 int size = pairsize * count;
1301                 int items = count;
1302                 int i;
1303
1304                 if (size > bufsize) {
1305                         items = bufsize / pairsize;
1306                         size = pairsize * items;
1307                 }
1308
1309                 lbuf.lb_buf = buf;
1310                 lbuf.lb_len = size;
1311                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1312                 for (i = 0; i < items && !rc; i++) {
1313                         void *key = &buf[i * pairsize];
1314                         void *rec = &buf[i * pairsize + keysize];
1315
1316                         /* Tn: restore the records. */
1317                         th = dt_trans_create(env, dev);
1318                         if (!th)
1319                                 GOTO(out, rc = -ENOMEM);
1320
1321                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1322                         if (rc)
1323                                 GOTO(stop, rc);
1324
1325                         rc = dt_trans_start_local(env, dev, th);
1326                         if (rc)
1327                                 GOTO(stop, rc);
1328
1329                         rc = dt_insert(env, tgt_obj, rec, key, th);
1330                         if (unlikely(rc == -EEXIST))
1331                                 rc = 0;
1332
1333                         dt_trans_stop(env, dev, th);
1334                 }
1335
1336                 count -= items;
1337         }
1338
1339         GOTO(out, rc);
1340
1341 stop:
1342         dt_trans_stop(env, dev, th);
1343         if (rc && registered)
1344                 /* Degister the index to avoid overwriting the backup. */
1345                 lustre_index_degister(head, lock, tgt_fid);
1346
1347 out:
1348         if (!IS_ERR_OR_NULL(tgt_obj))
1349                 dt_object_put_nocache(env, tgt_obj);
1350         if (!IS_ERR_OR_NULL(bak_obj))
1351                 dt_object_put_nocache(env, bak_obj);
1352         if (!IS_ERR_OR_NULL(parent_obj))
1353                 dt_object_put_nocache(env, parent_obj);
1354         return rc;
1355 }
1356 EXPORT_SYMBOL(lustre_index_restore);