Whamcloud - gitweb
LU-17402 kernel: RHEL 8.10 client and server support
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of_safe(obj->do_lu.lo_dev, struct dt_device,
45                                  dd_lu_dev);
46 }
47
48 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 {
50         guid_copy(&des->sf_uuid, &src->sf_uuid);
51         des->sf_flags   = le64_to_cpu(src->sf_flags);
52         des->sf_magic   = le32_to_cpu(src->sf_magic);
53         des->sf_status  = le16_to_cpu(src->sf_status);
54         des->sf_param   = le16_to_cpu(src->sf_param);
55         des->sf_time_last_complete      =
56                                 le64_to_cpu(src->sf_time_last_complete);
57         des->sf_time_latest_start       =
58                                 le64_to_cpu(src->sf_time_latest_start);
59         des->sf_time_last_checkpoint    =
60                                 le64_to_cpu(src->sf_time_last_checkpoint);
61         des->sf_pos_latest_start        =
62                                 le64_to_cpu(src->sf_pos_latest_start);
63         des->sf_pos_last_checkpoint     =
64                                 le64_to_cpu(src->sf_pos_last_checkpoint);
65         des->sf_pos_first_inconsistent  =
66                                 le64_to_cpu(src->sf_pos_first_inconsistent);
67         des->sf_items_checked           =
68                                 le64_to_cpu(src->sf_items_checked);
69         des->sf_items_updated           =
70                                 le64_to_cpu(src->sf_items_updated);
71         des->sf_items_failed            =
72                                 le64_to_cpu(src->sf_items_failed);
73         des->sf_items_updated_prior     =
74                                 le64_to_cpu(src->sf_items_updated_prior);
75         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
76         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
77         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
78         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
79         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
80 }
81
82 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 {
84         guid_copy(&des->sf_uuid, &src->sf_uuid);
85         des->sf_flags   = cpu_to_le64(src->sf_flags);
86         des->sf_magic   = cpu_to_le32(src->sf_magic);
87         des->sf_status  = cpu_to_le16(src->sf_status);
88         des->sf_param   = cpu_to_le16(src->sf_param);
89         des->sf_time_last_complete      =
90                                 cpu_to_le64(src->sf_time_last_complete);
91         des->sf_time_latest_start       =
92                                 cpu_to_le64(src->sf_time_latest_start);
93         des->sf_time_last_checkpoint    =
94                                 cpu_to_le64(src->sf_time_last_checkpoint);
95         des->sf_pos_latest_start        =
96                                 cpu_to_le64(src->sf_pos_latest_start);
97         des->sf_pos_last_checkpoint     =
98                                 cpu_to_le64(src->sf_pos_last_checkpoint);
99         des->sf_pos_first_inconsistent  =
100                                 cpu_to_le64(src->sf_pos_first_inconsistent);
101         des->sf_items_checked           =
102                                 cpu_to_le64(src->sf_items_checked);
103         des->sf_items_updated           =
104                                 cpu_to_le64(src->sf_items_updated);
105         des->sf_items_failed            =
106                                 cpu_to_le64(src->sf_items_failed);
107         des->sf_items_updated_prior     =
108                                 cpu_to_le64(src->sf_items_updated_prior);
109         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
110         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
111         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
112         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
113         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
114 }
115
116 void scrub_file_init(struct lustre_scrub *scrub, guid_t uuid)
117 {
118         struct scrub_file *sf = &scrub->os_file;
119
120         memset(sf, 0, sizeof(*sf));
121         guid_copy(&sf->sf_uuid, &uuid);
122         sf->sf_magic = SCRUB_MAGIC_V2;
123         sf->sf_status = SS_INIT;
124 }
125 EXPORT_SYMBOL(scrub_file_init);
126
127 void scrub_file_reset(struct lustre_scrub *scrub, guid_t uuid, u64 flags)
128 {
129         struct scrub_file *sf = &scrub->os_file;
130         ENTRY;
131
132         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
133                "%#llx, add flags = %#llx\n",
134                scrub->os_name, sf->sf_flags, flags);
135
136         guid_copy(&sf->sf_uuid, &uuid);
137         sf->sf_magic = SCRUB_MAGIC_V2;
138         sf->sf_status = SS_INIT;
139         sf->sf_flags |= flags;
140         sf->sf_flags &= ~SF_AUTO;
141         sf->sf_run_time = 0;
142         sf->sf_time_latest_start = 0;
143         sf->sf_time_last_checkpoint = 0;
144         sf->sf_pos_latest_start = 0;
145         sf->sf_pos_last_checkpoint = 0;
146         sf->sf_pos_first_inconsistent = 0;
147         sf->sf_items_checked = 0;
148         sf->sf_items_updated = 0;
149         sf->sf_items_failed = 0;
150         sf->sf_items_noscrub = 0;
151         sf->sf_items_igif = 0;
152         if (!scrub->os_in_join)
153                 sf->sf_items_updated_prior = 0;
154         EXIT;
155 }
156 EXPORT_SYMBOL(scrub_file_reset);
157
158 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
159 {
160         struct scrub_file *sf = &scrub->os_file;
161         struct lu_buf buf = {
162                 .lb_buf = &scrub->os_file_disk,
163                 .lb_len = sizeof(scrub->os_file_disk)
164         };
165         loff_t pos = 0;
166         int rc;
167
168         rc = dt_read(env, scrub->os_obj, &buf, &pos);
169         /* failure */
170         if (rc < 0) {
171                 CERROR("%s: fail to load scrub file: rc = %d\n",
172                        scrub->os_name, rc);
173                 return rc;
174         }
175
176         /* empty */
177         if (!rc)
178                 return -ENOENT;
179
180         /* corrupted */
181         if (rc < buf.lb_len) {
182                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
183                        "expected = %d: rc = %d\n",
184                        scrub->os_name, (int)buf.lb_len, rc);
185                 return -EFAULT;
186         }
187
188         scrub_file_to_cpu(sf, &scrub->os_file_disk);
189         if (sf->sf_magic == SCRUB_MAGIC_V1) {
190                 CWARN("%s: reset scrub OI count for format change (LU-16655)\n",
191                       scrub->os_name);
192                 sf->sf_oi_count = 0;
193         } else if (sf->sf_magic != SCRUB_MAGIC_V2) {
194                 CDEBUG(D_LFSCK, "%s: invalid scrub magic %#x, should be %#x\n",
195                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V2);
196                 return -EFAULT;
197         }
198
199         return 0;
200 }
201 EXPORT_SYMBOL(scrub_file_load);
202
203 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
204 {
205         struct scrub_file *sf = &scrub->os_file_disk;
206         struct dt_object *obj = scrub->os_obj;
207         struct dt_device *dev = scrub_obj2dev(obj);
208         struct lu_buf buf = {
209                 .lb_buf = sf,
210                 .lb_len = sizeof(*sf)
211         };
212         struct thandle *th;
213         loff_t pos = 0;
214         int rc;
215         ENTRY;
216
217         /* Skip store under rdonly mode. */
218         if (dev->dd_rdonly)
219                 RETURN(0);
220
221         scrub_file_to_le(sf, &scrub->os_file);
222         th = dt_trans_create(env, dev);
223         if (IS_ERR(th))
224                 GOTO(log, rc = PTR_ERR(th));
225
226         rc = dt_declare_record_write(env, obj, &buf, pos, th);
227         if (rc)
228                 GOTO(stop, rc);
229
230         rc = dt_trans_start_local(env, dev, th);
231         if (rc)
232                 GOTO(stop, rc);
233
234         rc = dt_record_write(env, obj, &buf, &pos, th);
235
236         GOTO(stop, rc);
237
238 stop:
239         dt_trans_stop(env, dev, th);
240
241 log:
242         if (rc)
243                 CERROR("%s: store scrub file: rc = %d\n",
244                        scrub->os_name, rc);
245         else
246                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
247                        scrub->os_name, rc);
248
249         scrub->os_time_last_checkpoint = ktime_get_seconds();
250         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
251                                          SCRUB_CHECKPOINT_INTERVAL;
252         return rc;
253 }
254 EXPORT_SYMBOL(scrub_file_store);
255
256 bool scrub_needs_check(struct lustre_scrub *scrub, const struct lu_fid *fid,
257                        u64 index)
258 {
259         bool check = true;
260
261         if (!fid_is_norm(fid) && !fid_is_igif(fid))
262                 check = false;
263         else if (scrub->os_running && scrub->os_pos_current > index)
264                 check = false;
265         else if (scrub->os_auto_scrub_interval == AS_NEVER)
266                 check = false;
267         else if (ktime_get_real_seconds() <
268                  scrub->os_file.sf_time_last_complete +
269                  scrub->os_auto_scrub_interval)
270                 check = false;
271
272         return check;
273 }
274 EXPORT_SYMBOL(scrub_needs_check);
275
276 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
277 {
278         struct scrub_file *sf = &scrub->os_file;
279         time64_t now = ktime_get_seconds();
280         int rc;
281
282         if (likely(now < scrub->os_time_next_checkpoint ||
283                    scrub->os_new_checked == 0))
284                 return 0;
285
286         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
287                scrub->os_name, scrub->os_pos_current);
288
289         down_write(&scrub->os_rwsem);
290         sf->sf_items_checked += scrub->os_new_checked;
291         scrub->os_new_checked = 0;
292         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
293         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
294         sf->sf_run_time += now - scrub->os_time_last_checkpoint;
295         rc = scrub_file_store(env, scrub);
296         up_write(&scrub->os_rwsem);
297
298         return rc;
299 }
300 EXPORT_SYMBOL(scrub_checkpoint);
301
302 int scrub_thread_prep(const struct lu_env *env, struct lustre_scrub *scrub,
303                       guid_t uuid, u64 start)
304 {
305         struct scrub_file *sf = &scrub->os_file;
306         u32 flags = scrub->os_start_flags;
307         bool drop_dryrun = false;
308         int rc;
309
310         ENTRY;
311         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
312                scrub->os_name, flags);
313
314         down_write(&scrub->os_rwsem);
315         if (flags & SS_SET_FAILOUT)
316                 sf->sf_param |= SP_FAILOUT;
317         else if (flags & SS_CLEAR_FAILOUT)
318                 sf->sf_param &= ~SP_FAILOUT;
319
320         if (flags & SS_SET_DRYRUN) {
321                 sf->sf_param |= SP_DRYRUN;
322         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
323                 sf->sf_param &= ~SP_DRYRUN;
324                 drop_dryrun = true;
325         }
326
327         if (flags & SS_RESET)
328                 scrub_file_reset(scrub, uuid, 0);
329
330         spin_lock(&scrub->os_lock);
331         scrub->os_partial_scan = 0;
332         if (flags & SS_AUTO_FULL) {
333                 scrub->os_full_speed = 1;
334                 sf->sf_flags |= SF_AUTO;
335         } else if (flags & SS_AUTO_PARTIAL) {
336                 scrub->os_full_speed = 0;
337                 scrub->os_partial_scan = 1;
338                 sf->sf_flags |= SF_AUTO;
339         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
340                                    SF_UPGRADE)) {
341                 scrub->os_full_speed = 1;
342         } else {
343                 scrub->os_full_speed = 0;
344         }
345
346         scrub->os_in_prior = 0;
347         scrub->os_waiting = 0;
348         scrub->os_paused = 0;
349         scrub->os_in_join = 0;
350         scrub->os_full_scrub = 0;
351         spin_unlock(&scrub->os_lock);
352         scrub->os_new_checked = 0;
353         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
354                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
355         else if (sf->sf_pos_last_checkpoint != 0)
356                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
357         else
358                 sf->sf_pos_latest_start = start;
359
360         scrub->os_pos_current = sf->sf_pos_latest_start;
361         sf->sf_status = SS_SCANNING;
362         sf->sf_time_latest_start = ktime_get_real_seconds();
363         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
364         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
365         rc = scrub_file_store(env, scrub);
366         if (rc == 0) {
367                 spin_lock(&scrub->os_lock);
368                 scrub->os_running = 1;
369                 spin_unlock(&scrub->os_lock);
370                 wake_up_var(scrub);
371         }
372         up_write(&scrub->os_rwsem);
373
374         RETURN(rc);
375 }
376 EXPORT_SYMBOL(scrub_thread_prep);
377
378 int scrub_thread_post(const struct lu_env *env, struct lustre_scrub *scrub,
379                       int result)
380 {
381         struct scrub_file *sf = &scrub->os_file;
382         int rc;
383         ENTRY;
384
385         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
386                scrub->os_name, result);
387
388         down_write(&scrub->os_rwsem);
389         spin_lock(&scrub->os_lock);
390         scrub->os_running = 0;
391         spin_unlock(&scrub->os_lock);
392         if (scrub->os_new_checked > 0) {
393                 sf->sf_items_checked += scrub->os_new_checked;
394                 scrub->os_new_checked = 0;
395                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
396         }
397         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
398         if (result > 0) {
399                 sf->sf_status = SS_COMPLETED;
400                 if (!(sf->sf_param & SP_DRYRUN)) {
401                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
402                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
403                                           SF_UPGRADE | SF_AUTO);
404                 }
405                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
406                 sf->sf_success_count++;
407         } else if (result == 0) {
408                 if (scrub->os_paused)
409                         sf->sf_status = SS_PAUSED;
410                 else
411                         sf->sf_status = SS_STOPPED;
412         } else {
413                 sf->sf_status = SS_FAILED;
414         }
415         sf->sf_run_time += ktime_get_seconds() -
416                            scrub->os_time_last_checkpoint;
417
418         rc = scrub_file_store(env, scrub);
419         up_write(&scrub->os_rwsem);
420
421         RETURN(rc < 0 ? rc : result);
422 }
423 EXPORT_SYMBOL(scrub_thread_post);
424
425 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
426                 void *data, __u32 flags)
427 {
428         struct task_struct *task;
429         int rc;
430         ENTRY;
431
432         if (scrub->os_task)
433                 RETURN(-EALREADY);
434
435         if (scrub->os_file.sf_status == SS_COMPLETED) {
436                 if (!(flags & SS_SET_FAILOUT))
437                         flags |= SS_CLEAR_FAILOUT;
438
439                 if (!(flags & SS_SET_DRYRUN))
440                         flags |= SS_CLEAR_DRYRUN;
441
442                 flags |= SS_RESET;
443         }
444
445         task = kthread_create(threadfn, data, "OI_scrub");
446         if (IS_ERR(task)) {
447                 rc = PTR_ERR(task);
448                 CERROR("%s: cannot start iteration thread: rc = %d\n",
449                        scrub->os_name, rc);
450                 RETURN(rc);
451         }
452         spin_lock(&scrub->os_lock);
453         if (scrub->os_task) {
454                 /* Lost a race */
455                 spin_unlock(&scrub->os_lock);
456                 kthread_stop(task);
457                 RETURN(-EALREADY);
458         }
459         scrub->os_start_flags = flags;
460         scrub->os_task = task;
461         wake_up_process(task);
462         spin_unlock(&scrub->os_lock);
463         wait_var_event(scrub, scrub->os_running || !scrub->os_task);
464
465         RETURN(0);
466 }
467 EXPORT_SYMBOL(scrub_start);
468
469 void scrub_stop(struct lustre_scrub *scrub)
470 {
471         struct task_struct *task;
472
473         spin_lock(&scrub->os_lock);
474         scrub->os_running = 0;
475         spin_unlock(&scrub->os_lock);
476         task = xchg(&scrub->os_task, NULL);
477         if (task)
478                 kthread_stop(task);
479 }
480 EXPORT_SYMBOL(scrub_stop);
481
482 const char *const scrub_status_names[] = {
483         "init",
484         "scanning",
485         "completed",
486         "failed",
487         "stopped",
488         "paused",
489         "crashed",
490         NULL
491 };
492
493 const char *const scrub_flags_names[] = {
494         "recreated",
495         "inconsistent",
496         "auto",
497         "upgrade",
498         NULL
499 };
500
501 const char *const scrub_param_names[] = {
502         "failout",
503         "dryrun",
504         NULL
505 };
506
507 static void scrub_bits_dump(struct seq_file *m, int bits,
508                             const char *const names[],
509                             const char *prefix)
510 {
511         int flag;
512         int i;
513
514         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
515
516         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
517                 if (flag & bits) {
518                         bits &= ~flag;
519                         seq_printf(m, "%s%c", names[i],
520                                    bits != 0 ? ',' : '\n');
521                 }
522         }
523 }
524
525 static void scrub_time_dump(struct seq_file *m, time64_t time,
526                             const char *prefix)
527 {
528         if (time != 0)
529                 seq_printf(m, "%s: %llu seconds\n", prefix,
530                            ktime_get_real_seconds() - time);
531         else
532                 seq_printf(m, "%s: N/A\n", prefix);
533 }
534
535 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
536 {
537         if (pos != 0)
538                 seq_printf(m, "%s: %llu\n", prefix, pos);
539         else
540                 seq_printf(m, "%s: N/A\n", prefix);
541 }
542
543 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
544 {
545         struct scrub_file *sf = &scrub->os_file;
546         u64 checked;
547         s64 speed;
548
549         down_read(&scrub->os_rwsem);
550         seq_printf(m, "name: OI_scrub\n"
551                    "magic: 0x%x\n"
552                    "oi_files: %d\n"
553                    "status: %s\n",
554                    sf->sf_magic, (int)sf->sf_oi_count,
555                    scrub_status_names[sf->sf_status]);
556
557         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
558
559         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
560
561         scrub_time_dump(m, sf->sf_time_last_complete,
562                         "time_since_last_completed");
563
564         scrub_time_dump(m, sf->sf_time_latest_start,
565                         "time_since_latest_start");
566
567         scrub_time_dump(m, sf->sf_time_last_checkpoint,
568                         "time_since_last_checkpoint");
569
570         scrub_pos_dump(m, sf->sf_pos_latest_start,
571                         "latest_start_position");
572
573         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
574                         "last_checkpoint_position");
575
576         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
577                         "first_failure_position");
578
579         checked = sf->sf_items_checked + scrub->os_new_checked;
580         seq_printf(m, "checked: %llu\n"
581                    "%s: %llu\n"
582                    "failed: %llu\n"
583                    "prior_%s: %llu\n"
584                    "noscrub: %llu\n"
585                    "igif: %llu\n"
586                    "success_count: %u\n",
587                    checked,
588                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
589                    sf->sf_items_updated, sf->sf_items_failed,
590                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
591                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
592                    sf->sf_items_igif, sf->sf_success_count);
593
594         speed = checked;
595         if (scrub->os_running) {
596                 s64 new_checked = scrub->os_new_checked;
597                 time64_t duration;
598                 time64_t rtime;
599
600                 /* Since the time resolution is in seconds for new system
601                  * or small devices it ismore likely that duration will be
602                  * zero which will lead to inaccurate results.
603                  */
604                 duration = ktime_get_seconds() -
605                            scrub->os_time_last_checkpoint;
606                 if (duration != 0)
607                         new_checked = div_s64(new_checked, duration);
608
609                 rtime = sf->sf_run_time + duration;
610                 if (rtime != 0)
611                         speed = div_s64(speed, rtime);
612
613                 seq_printf(m, "run_time: %lld seconds\n"
614                            "average_speed: %lld objects/sec\n"
615                            "real_time_speed: %lld objects/sec\n"
616                            "current_position: %llu\n"
617                            "scrub_in_prior: %s\n"
618                            "scrub_full_speed: %s\n"
619                            "partial_scan: %s\n",
620                            rtime, speed, new_checked,
621                            scrub->os_pos_current,
622                            scrub->os_in_prior ? "yes" : "no",
623                            scrub->os_full_speed ? "yes" : "no",
624                            scrub->os_partial_scan ? "yes" : "no");
625         } else {
626                 if (sf->sf_run_time != 0)
627                         speed = div_s64(speed, sf->sf_run_time);
628                 seq_printf(m, "run_time: %d seconds\n"
629                            "average_speed: %lld objects/sec\n"
630                            "real_time_speed: N/A\n"
631                            "current_position: N/A\n",
632                            sf->sf_run_time, speed);
633         }
634
635         up_read(&scrub->os_rwsem);
636 }
637 EXPORT_SYMBOL(scrub_dump);
638
639 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
640                     const struct lu_fid *cfid, __u64 child,
641                     const char *name, int namelen)
642 {
643         struct lustre_index_restore_unit *liru;
644         int len = sizeof(*liru) + namelen + 1;
645
646         OBD_ALLOC(liru, len);
647         if (!liru)
648                 return -ENOMEM;
649
650         INIT_LIST_HEAD(&liru->liru_link);
651         liru->liru_pfid = *pfid;
652         liru->liru_cfid = *cfid;
653         liru->liru_clid = child;
654         liru->liru_len = len;
655         memcpy(liru->liru_name, name, namelen);
656         liru->liru_name[namelen] = 0;
657         list_add_tail(&liru->liru_link, head);
658
659         return 0;
660 }
661 EXPORT_SYMBOL(lustre_liru_new);
662
663 int lustre_index_register(struct dt_device *dev, const char *devname,
664                           struct list_head *head, spinlock_t *lock, int *guard,
665                           const struct lu_fid *fid,
666                           __u32 keysize, __u32 recsize)
667 {
668         struct lustre_index_backup_unit *libu, *pos;
669         int rc = 0;
670         ENTRY;
671
672         if (dev->dd_rdonly || *guard)
673                 RETURN(1);
674
675         OBD_ALLOC_PTR(libu);
676         if (!libu)
677                 RETURN(-ENOMEM);
678
679         INIT_LIST_HEAD(&libu->libu_link);
680         libu->libu_keysize = keysize;
681         libu->libu_recsize = recsize;
682         libu->libu_fid = *fid;
683
684         spin_lock(lock);
685         if (unlikely(*guard)) {
686                 spin_unlock(lock);
687                 OBD_FREE_PTR(libu);
688
689                 RETURN(1);
690         }
691
692         list_for_each_entry_reverse(pos, head, libu_link) {
693                 rc = lu_fid_cmp(&pos->libu_fid, fid);
694                 if (rc < 0) {
695                         list_add(&libu->libu_link, &pos->libu_link);
696                         spin_unlock(lock);
697
698                         RETURN(0);
699                 }
700
701                 if (!rc) {
702                         /* Registered already. But the former registered one
703                          * has different keysize/recsize. It may because that
704                          * the former values are from disk and corrupted, then
705                          * replace it with new values. */
706                         if (unlikely(keysize != pos->libu_keysize ||
707                                      recsize != pos->libu_recsize)) {
708                                 CWARN("%s: the index "DFID" has registered "
709                                       "with %u/%u, may be invalid, replace "
710                                       "with %u/%u\n",
711                                       devname, PFID(fid), pos->libu_keysize,
712                                       pos->libu_recsize, keysize, recsize);
713
714                                 pos->libu_keysize = keysize;
715                                 pos->libu_recsize = recsize;
716                         } else {
717                                 rc = 1;
718                         }
719
720                         spin_unlock(lock);
721                         OBD_FREE_PTR(libu);
722
723                         RETURN(rc);
724                 }
725         }
726
727         list_add(&libu->libu_link, head);
728         spin_unlock(lock);
729
730         RETURN(0);
731 }
732 EXPORT_SYMBOL(lustre_index_register);
733
734 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
735                                   const struct lu_fid *fid)
736 {
737         struct lustre_index_backup_unit *libu;
738         int rc = -ENOENT;
739
740         spin_lock(lock);
741         list_for_each_entry_reverse(libu, head, libu_link) {
742                 rc = lu_fid_cmp(&libu->libu_fid, fid);
743                 /* NOT registered. */
744                 if (rc < 0)
745                         break;
746
747                 if (!rc) {
748                         list_del(&libu->libu_link);
749                         break;
750                 }
751         }
752         spin_unlock(lock);
753
754         if (!rc)
755                 OBD_FREE_PTR(libu);
756 }
757
758 static void
759 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
760                                 __u32 keysize, __u32 recsize,
761                                 const struct lu_fid *fid, __u32 count)
762 {
763         memset(header, 0, sizeof(*header));
764         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
765         header->libh_count = cpu_to_le32(count);
766         header->libh_keysize = cpu_to_le32(keysize);
767         header->libh_recsize = cpu_to_le32(recsize);
768         fid_cpu_to_le(&header->libh_owner, fid);
769 }
770
771 static int lustre_index_backup_body(const struct lu_env *env,
772                                     struct dt_object *obj, loff_t *pos,
773                                     void *buf, int bufsize)
774 {
775         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
776         struct thandle *th;
777         struct lu_buf lbuf = {
778                 .lb_buf = buf,
779                 .lb_len = bufsize
780         };
781         int rc;
782         ENTRY;
783
784         th = dt_trans_create(env, dev);
785         if (IS_ERR(th))
786                 RETURN(PTR_ERR(th));
787
788         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
789         if (rc)
790                 GOTO(stop, rc);
791
792         rc = dt_trans_start_local(env, dev, th);
793         if (rc)
794                 GOTO(stop, rc);
795
796         rc = dt_record_write(env, obj, &lbuf, pos, th);
797
798         GOTO(stop, rc);
799
800 stop:
801         dt_trans_stop(env, dev, th);
802         return rc;
803 }
804
805 static int lustre_index_backup_header(const struct lu_env *env,
806                                       struct dt_object *obj,
807                                       const struct lu_fid *tgt_fid,
808                                       __u32 keysize, __u32 recsize,
809                                       void *buf, int bufsize, int count)
810 {
811         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
812         struct lustre_index_backup_header *header = buf;
813         struct lu_attr *la = buf;
814         struct thandle *th;
815         struct lu_buf lbuf = {
816                 .lb_buf = header,
817                 .lb_len = sizeof(*header)
818         };
819         loff_t size = sizeof(*header) + (keysize + recsize) * count;
820         loff_t pos = 0;
821         int rc;
822         bool punch = false;
823         ENTRY;
824
825         LASSERT(sizeof(*la) <= bufsize);
826         LASSERT(sizeof(*header) <= bufsize);
827
828         rc = dt_attr_get(env, obj, la);
829         if (rc)
830                 RETURN(rc);
831
832         if (la->la_size > size)
833                 punch = true;
834
835         lustre_index_backup_make_header(header, keysize, recsize,
836                                         tgt_fid, count);
837         th = dt_trans_create(env, dev);
838         if (IS_ERR(th))
839                 RETURN(PTR_ERR(th));
840
841         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
842         if (rc)
843                 GOTO(stop, rc);
844
845         if (punch) {
846                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
847                 if (rc)
848                         GOTO(stop, rc);
849         }
850
851         rc = dt_trans_start_local(env, dev, th);
852         if (rc)
853                 GOTO(stop, rc);
854
855         rc = dt_record_write(env, obj, &lbuf, &pos, th);
856         if (!rc && punch)
857                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
858
859         GOTO(stop, rc);
860
861 stop:
862         dt_trans_stop(env, dev, th);
863         return rc;
864 }
865
866 static int lustre_index_update_lma(const struct lu_env *env,
867                                    struct dt_object *obj,
868                                    void *buf, int bufsize)
869 {
870         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
871         struct lustre_mdt_attrs *lma = buf;
872         struct lu_buf lbuf = {
873                 .lb_buf = lma,
874                 .lb_len = sizeof(struct lustre_ost_attrs)
875         };
876         struct thandle *th;
877         int fl = LU_XATTR_REPLACE;
878         int rc;
879         ENTRY;
880
881         LASSERT(bufsize >= lbuf.lb_len);
882
883         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
884         if (unlikely(rc == -ENODATA)) {
885                 fl = LU_XATTR_CREATE;
886                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
887                                 LMAC_IDX_BACKUP, 0);
888                 rc = sizeof(*lma);
889         } else if (rc < sizeof(*lma)) {
890                 RETURN(rc < 0 ? rc : -EFAULT);
891         } else {
892                 lustre_lma_swab(lma);
893                 if (lma->lma_compat & LMAC_IDX_BACKUP)
894                         RETURN(0);
895
896                 lma->lma_compat |= LMAC_IDX_BACKUP;
897         }
898
899         lustre_lma_swab(lma);
900         lbuf.lb_len = rc;
901         th = dt_trans_create(env, dev);
902         if (IS_ERR(th))
903                 RETURN(rc);
904
905         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
906         if (rc)
907                 GOTO(stop, rc);
908
909         rc = dt_trans_start_local(env, dev, th);
910         if (rc)
911                 GOTO(stop, rc);
912
913         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
914
915         GOTO(stop, rc);
916
917 stop:
918         dt_trans_stop(env, dev, th);
919         return rc;
920 }
921
922 static int lustre_index_backup_one(const struct lu_env *env,
923                                    struct local_oid_storage *los,
924                                    struct dt_object *parent,
925                                    struct lustre_index_backup_unit *libu,
926                                    char *buf, int bufsize)
927 {
928         struct dt_device *dev = scrub_obj2dev(parent);
929         struct dt_object *tgt_obj = NULL;
930         struct dt_object *bak_obj = NULL;
931         const struct dt_it_ops *iops;
932         struct dt_it *di;
933         loff_t pos = sizeof(struct lustre_index_backup_header);
934         int count = 0;
935         int size = 0;
936         int rc;
937         ENTRY;
938
939         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
940                                              &libu->libu_fid, NULL));
941         if (IS_ERR_OR_NULL(tgt_obj))
942                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
943
944         if (!dt_object_exists(tgt_obj))
945                 GOTO(out, rc = 0);
946
947         if (!tgt_obj->do_index_ops) {
948                 struct dt_index_features feat;
949
950                 feat.dif_flags = DT_IND_UPDATE;
951                 feat.dif_keysize_min = libu->libu_keysize;
952                 feat.dif_keysize_max = libu->libu_keysize;
953                 feat.dif_recsize_min = libu->libu_recsize;
954                 feat.dif_recsize_max = libu->libu_recsize;
955                 feat.dif_ptrsize = 4;
956                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
957                 if (rc)
958                         GOTO(out, rc);
959         }
960
961         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
962         bak_obj = local_file_find_or_create(env, los, parent, buf,
963                                             S_IFREG | S_IRUGO | S_IWUSR);
964         if (IS_ERR_OR_NULL(bak_obj))
965                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
966
967         iops = &tgt_obj->do_index_ops->dio_it;
968         di = iops->init(env, tgt_obj, 0);
969         if (IS_ERR(di))
970                 GOTO(out, rc = PTR_ERR(di));
971
972         rc = iops->load(env, di, 0);
973         if (!rc)
974                 rc = iops->next(env, di);
975         else if (rc > 0)
976                 rc = 0;
977
978         while (!rc) {
979                 void *key;
980                 void *rec;
981
982                 key = iops->key(env, di);
983                 memcpy(&buf[size], key, libu->libu_keysize);
984                 size += libu->libu_keysize;
985                 rec = &buf[size];
986                 rc = iops->rec(env, di, rec, 0);
987                 if (rc)
988                         GOTO(fini, rc);
989
990                 size += libu->libu_recsize;
991                 count++;
992                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
993                         rc = lustre_index_backup_body(env, bak_obj, &pos,
994                                                       buf, size);
995                         if (rc)
996                                 GOTO(fini, rc);
997
998                         size = 0;
999                 }
1000
1001                 rc = iops->next(env, di);
1002         }
1003
1004         if (rc >= 0 && size > 0)
1005                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
1006
1007         if (rc < 0)
1008                 GOTO(fini, rc);
1009
1010         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
1011                                         libu->libu_keysize, libu->libu_recsize,
1012                                         buf, bufsize, count);
1013         if (!rc)
1014                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
1015
1016         if (!rc && CFS_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
1017                 LASSERT(bufsize >= 512);
1018
1019                 pos = 0;
1020                 memset(buf, 0, 512);
1021                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
1022         }
1023
1024         GOTO(fini, rc);
1025
1026 fini:
1027         iops->fini(env, di);
1028 out:
1029         if (!IS_ERR_OR_NULL(tgt_obj))
1030                 dt_object_put_nocache(env, tgt_obj);
1031         if (!IS_ERR_OR_NULL(bak_obj))
1032                 dt_object_put_nocache(env, bak_obj);
1033         return rc;
1034 }
1035
1036 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
1037                          const char *devname, struct list_head *head,
1038                          spinlock_t *lock, int *guard, bool backup)
1039 {
1040         struct lustre_index_backup_unit *libu;
1041         struct local_oid_storage *los = NULL;
1042         struct dt_object *parent = NULL;
1043         char *buf = NULL;
1044         struct lu_fid fid;
1045         int rc;
1046         ENTRY;
1047
1048         if (dev->dd_rdonly || *guard)
1049                 RETURN_EXIT;
1050
1051         spin_lock(lock);
1052         *guard = 1;
1053         spin_unlock(lock);
1054
1055         if (list_empty(head))
1056                 RETURN_EXIT;
1057
1058         /* Handle kinds of failures during mount process. */
1059         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
1060                 backup = false;
1061
1062         if (backup) {
1063                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1064                 if (!buf) {
1065                         backup = false;
1066                         goto scan;
1067                 }
1068
1069                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
1070                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1071                                                     &fid, NULL));
1072                 if (IS_ERR_OR_NULL(parent)) {
1073                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
1074                                devname, parent ? PTR_ERR(parent) : -ENOENT);
1075                         backup = false;
1076                         goto scan;
1077                 }
1078
1079                 lu_local_name_obj_fid(&fid, 1);
1080                 rc = local_oid_storage_init(env, dev, &fid, &los);
1081                 if (rc) {
1082                         CERROR("%s: failed to init local storage: rc = %d\n",
1083                                devname, rc);
1084                         backup = false;
1085                 }
1086         }
1087
1088 scan:
1089         spin_lock(lock);
1090         while (!list_empty(head)) {
1091                 libu = list_first_entry(head,
1092                                         struct lustre_index_backup_unit,
1093                                         libu_link);
1094                 list_del_init(&libu->libu_link);
1095                 spin_unlock(lock);
1096
1097                 if (backup) {
1098                         rc = lustre_index_backup_one(env, los, parent, libu,
1099                                                      buf, INDEX_BACKUP_BUFSIZE);
1100                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
1101                                devname, PFID(&libu->libu_fid), rc);
1102                 }
1103
1104                 OBD_FREE_PTR(libu);
1105                 spin_lock(lock);
1106         }
1107         spin_unlock(lock);
1108
1109         if (los)
1110                 local_oid_storage_fini(env, los);
1111         if (parent)
1112                 dt_object_put_nocache(env, parent);
1113         if (buf)
1114                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1115
1116         EXIT;
1117 }
1118 EXPORT_SYMBOL(lustre_index_backup);
1119
1120 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
1121                          const struct lu_fid *parent_fid,
1122                          const struct lu_fid *tgt_fid,
1123                          const struct lu_fid *bak_fid, const char *name,
1124                          struct list_head *head, spinlock_t *lock,
1125                          char *buf, int bufsize)
1126 {
1127         struct dt_object *parent_obj = NULL;
1128         struct dt_object *tgt_obj = NULL;
1129         struct dt_object *bak_obj = NULL;
1130         struct lustre_index_backup_header *header;
1131         struct dt_index_features *feat;
1132         struct dt_object_format *dof;
1133         struct lu_attr *la;
1134         struct thandle *th;
1135         struct lu_object_conf conf;
1136         struct dt_insert_rec ent;
1137         struct lu_buf lbuf;
1138         struct lu_fid tfid;
1139         loff_t pos = 0;
1140         __u32 keysize;
1141         __u32 recsize;
1142         __u32 pairsize;
1143         int count;
1144         int rc;
1145         bool registered = false;
1146         ENTRY;
1147
1148         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1149                 sizeof(*feat) + sizeof(*header));
1150
1151         memset(buf, 0, bufsize);
1152         la = (struct lu_attr *)buf;
1153         dof = (void *)la + sizeof(*la);
1154         feat = (void *)dof + sizeof(*dof);
1155         header = (void *)feat + sizeof(*feat);
1156         lbuf.lb_buf = header;
1157         lbuf.lb_len = sizeof(*header);
1158
1159         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1160                                              tgt_fid, NULL));
1161         if (IS_ERR_OR_NULL(tgt_obj))
1162                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1163
1164         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1165                                              bak_fid, NULL));
1166         if (IS_ERR_OR_NULL(bak_obj))
1167                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1168
1169         if (!dt_object_exists(bak_obj))
1170                 GOTO(out, rc = -ENOENT);
1171
1172         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1173                                                 parent_fid, NULL));
1174         if (IS_ERR_OR_NULL(parent_obj))
1175                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1176
1177         LASSERT(dt_object_exists(parent_obj));
1178
1179         if (unlikely(!dt_try_as_dir(env, parent_obj, true)))
1180                 GOTO(out, rc = -ENOTDIR);
1181
1182         rc = dt_attr_get(env, tgt_obj, la);
1183         if (rc)
1184                 GOTO(out, rc);
1185
1186         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1187         if (rc)
1188                 GOTO(out, rc);
1189
1190         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1191                 GOTO(out, rc = -EINVAL);
1192
1193         fid_le_to_cpu(&tfid, &header->libh_owner);
1194         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1195                 GOTO(out, rc = -EINVAL);
1196
1197         keysize = le32_to_cpu(header->libh_keysize);
1198         recsize = le32_to_cpu(header->libh_recsize);
1199         pairsize = keysize + recsize;
1200
1201         memset(feat, 0, sizeof(*feat));
1202         feat->dif_flags = DT_IND_UPDATE;
1203         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1204         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1205         feat->dif_ptrsize = 4;
1206
1207         /* T1: remove old name entry and destroy old index. */
1208         th = dt_trans_create(env, dev);
1209         if (IS_ERR(th))
1210                 GOTO(out, rc = PTR_ERR(th));
1211
1212         rc = dt_declare_delete(env, parent_obj,
1213                                (const struct dt_key *)name, th);
1214         if (rc)
1215                 GOTO(stop, rc);
1216
1217         rc = dt_declare_ref_del(env, tgt_obj, th);
1218         if (rc)
1219                 GOTO(stop, rc);
1220
1221         rc = dt_declare_destroy(env, tgt_obj, th);
1222         if (rc)
1223                 GOTO(stop, rc);
1224
1225         rc = dt_trans_start_local(env, dev, th);
1226         if (rc)
1227                 GOTO(stop, rc);
1228
1229         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1230         if (rc)
1231                 GOTO(stop, rc);
1232
1233         dt_write_lock(env, tgt_obj, 0);
1234         rc = dt_ref_del(env, tgt_obj, th);
1235         if (rc == 0) {
1236                 if (S_ISDIR(tgt_obj->do_lu.lo_header->loh_attr))
1237                         dt_ref_del(env, tgt_obj, th);
1238                 rc = dt_destroy(env, tgt_obj, th);
1239         }
1240         dt_write_unlock(env, tgt_obj);
1241         dt_trans_stop(env, dev, th);
1242         if (rc)
1243                 GOTO(out, rc);
1244
1245         la->la_valid = LA_MODE | LA_UID | LA_GID;
1246         conf.loc_flags = LOC_F_NEW;
1247         dof->u.dof_idx.di_feat = feat;
1248         dof->dof_type = DFT_INDEX;
1249         ent.rec_type = S_IFREG;
1250         ent.rec_fid = tgt_fid;
1251
1252         /* Drop cache before re-create it. */
1253         dt_object_put_nocache(env, tgt_obj);
1254         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1255                                              tgt_fid, &conf));
1256         if (IS_ERR_OR_NULL(tgt_obj))
1257                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1258
1259         LASSERT(!dt_object_exists(tgt_obj));
1260
1261         /* T2: create new index and insert new name entry. */
1262         th = dt_trans_create(env, dev);
1263         if (IS_ERR(th))
1264                 GOTO(out, rc = PTR_ERR(th));
1265
1266         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1267         if (rc)
1268                 GOTO(stop, rc);
1269
1270         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1271                                (const struct dt_key *)name, th);
1272         if (rc)
1273                 GOTO(stop, rc);
1274
1275         rc = dt_trans_start_local(env, dev, th);
1276         if (rc)
1277                 GOTO(stop, rc);
1278
1279         dt_write_lock(env, tgt_obj, 0);
1280         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1281         dt_write_unlock(env, tgt_obj);
1282         if (rc)
1283                 GOTO(stop, rc);
1284
1285         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1286                        (const struct dt_key *)name, th);
1287         dt_trans_stop(env, dev, th);
1288         /* Some index name may has been inserted by OSD
1289          * automatically when create the index object. */
1290         if (unlikely(rc == -EEXIST))
1291                 rc = 0;
1292         if (rc)
1293                 GOTO(out, rc);
1294
1295         /* The new index will register via index_try. */
1296         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1297         if (rc)
1298                 GOTO(out, rc);
1299
1300         registered = true;
1301         count = le32_to_cpu(header->libh_count);
1302         while (!rc && count > 0) {
1303                 int size = pairsize * count;
1304                 int items = count;
1305                 int i;
1306
1307                 if (size > bufsize) {
1308                         items = bufsize / pairsize;
1309                         size = pairsize * items;
1310                 }
1311
1312                 lbuf.lb_buf = buf;
1313                 lbuf.lb_len = size;
1314                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1315                 for (i = 0; i < items && !rc; i++) {
1316                         void *key = &buf[i * pairsize];
1317                         void *rec = &buf[i * pairsize + keysize];
1318
1319                         /* Tn: restore the records. */
1320                         th = dt_trans_create(env, dev);
1321                         if (!th)
1322                                 GOTO(out, rc = -ENOMEM);
1323
1324                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1325                         if (rc)
1326                                 GOTO(stop, rc);
1327
1328                         rc = dt_trans_start_local(env, dev, th);
1329                         if (rc)
1330                                 GOTO(stop, rc);
1331
1332                         rc = dt_insert(env, tgt_obj, rec, key, th);
1333                         if (unlikely(rc == -EEXIST))
1334                                 rc = 0;
1335
1336                         dt_trans_stop(env, dev, th);
1337                 }
1338
1339                 count -= items;
1340         }
1341
1342         GOTO(out, rc);
1343
1344 stop:
1345         dt_trans_stop(env, dev, th);
1346         if (rc && registered)
1347                 /* Degister the index to avoid overwriting the backup. */
1348                 lustre_index_degister(head, lock, tgt_fid);
1349
1350 out:
1351         if (!IS_ERR_OR_NULL(tgt_obj))
1352                 dt_object_put_nocache(env, tgt_obj);
1353         if (!IS_ERR_OR_NULL(bak_obj))
1354                 dt_object_put_nocache(env, bak_obj);
1355         if (!IS_ERR_OR_NULL(parent_obj))
1356                 dt_object_put_nocache(env, parent_obj);
1357         return rc;
1358 }
1359 EXPORT_SYMBOL(lustre_index_restore);