Whamcloud - gitweb
LU-14927 scrub: create shared scrub_needs_check() function.
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of_safe(obj->do_lu.lo_dev, struct dt_device,
45                                  dd_lu_dev);
46 }
47
48 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 {
50         uuid_copy(&des->sf_uuid, &src->sf_uuid);
51         des->sf_flags   = le64_to_cpu(src->sf_flags);
52         des->sf_magic   = le32_to_cpu(src->sf_magic);
53         des->sf_status  = le16_to_cpu(src->sf_status);
54         des->sf_param   = le16_to_cpu(src->sf_param);
55         des->sf_time_last_complete      =
56                                 le64_to_cpu(src->sf_time_last_complete);
57         des->sf_time_latest_start       =
58                                 le64_to_cpu(src->sf_time_latest_start);
59         des->sf_time_last_checkpoint    =
60                                 le64_to_cpu(src->sf_time_last_checkpoint);
61         des->sf_pos_latest_start        =
62                                 le64_to_cpu(src->sf_pos_latest_start);
63         des->sf_pos_last_checkpoint     =
64                                 le64_to_cpu(src->sf_pos_last_checkpoint);
65         des->sf_pos_first_inconsistent  =
66                                 le64_to_cpu(src->sf_pos_first_inconsistent);
67         des->sf_items_checked           =
68                                 le64_to_cpu(src->sf_items_checked);
69         des->sf_items_updated           =
70                                 le64_to_cpu(src->sf_items_updated);
71         des->sf_items_failed            =
72                                 le64_to_cpu(src->sf_items_failed);
73         des->sf_items_updated_prior     =
74                                 le64_to_cpu(src->sf_items_updated_prior);
75         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
76         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
77         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
78         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
79         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
80 }
81
82 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 {
84         uuid_copy(&des->sf_uuid, &src->sf_uuid);
85         des->sf_flags   = cpu_to_le64(src->sf_flags);
86         des->sf_magic   = cpu_to_le32(src->sf_magic);
87         des->sf_status  = cpu_to_le16(src->sf_status);
88         des->sf_param   = cpu_to_le16(src->sf_param);
89         des->sf_time_last_complete      =
90                                 cpu_to_le64(src->sf_time_last_complete);
91         des->sf_time_latest_start       =
92                                 cpu_to_le64(src->sf_time_latest_start);
93         des->sf_time_last_checkpoint    =
94                                 cpu_to_le64(src->sf_time_last_checkpoint);
95         des->sf_pos_latest_start        =
96                                 cpu_to_le64(src->sf_pos_latest_start);
97         des->sf_pos_last_checkpoint     =
98                                 cpu_to_le64(src->sf_pos_last_checkpoint);
99         des->sf_pos_first_inconsistent  =
100                                 cpu_to_le64(src->sf_pos_first_inconsistent);
101         des->sf_items_checked           =
102                                 cpu_to_le64(src->sf_items_checked);
103         des->sf_items_updated           =
104                                 cpu_to_le64(src->sf_items_updated);
105         des->sf_items_failed            =
106                                 cpu_to_le64(src->sf_items_failed);
107         des->sf_items_updated_prior     =
108                                 cpu_to_le64(src->sf_items_updated_prior);
109         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
110         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
111         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
112         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
113         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
114 }
115
116 void scrub_file_init(struct lustre_scrub *scrub, uuid_t uuid)
117 {
118         struct scrub_file *sf = &scrub->os_file;
119
120         memset(sf, 0, sizeof(*sf));
121         uuid_copy(&sf->sf_uuid, &uuid);
122         sf->sf_magic = SCRUB_MAGIC_V1;
123         sf->sf_status = SS_INIT;
124 }
125 EXPORT_SYMBOL(scrub_file_init);
126
127 void scrub_file_reset(struct lustre_scrub *scrub, uuid_t uuid, u64 flags)
128 {
129         struct scrub_file *sf = &scrub->os_file;
130
131         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
132                "%#llx, add flags = %#llx\n",
133                scrub->os_name, sf->sf_flags, flags);
134
135         uuid_copy(&sf->sf_uuid, &uuid);
136         sf->sf_status = SS_INIT;
137         sf->sf_flags |= flags;
138         sf->sf_flags &= ~SF_AUTO;
139         sf->sf_run_time = 0;
140         sf->sf_time_latest_start = 0;
141         sf->sf_time_last_checkpoint = 0;
142         sf->sf_pos_latest_start = 0;
143         sf->sf_pos_last_checkpoint = 0;
144         sf->sf_pos_first_inconsistent = 0;
145         sf->sf_items_checked = 0;
146         sf->sf_items_updated = 0;
147         sf->sf_items_failed = 0;
148         sf->sf_items_noscrub = 0;
149         sf->sf_items_igif = 0;
150         if (!scrub->os_in_join)
151                 sf->sf_items_updated_prior = 0;
152 }
153 EXPORT_SYMBOL(scrub_file_reset);
154
155 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
156 {
157         struct scrub_file *sf = &scrub->os_file;
158         struct lu_buf buf = {
159                 .lb_buf = &scrub->os_file_disk,
160                 .lb_len = sizeof(scrub->os_file_disk)
161         };
162         loff_t pos = 0;
163         int rc;
164
165         rc = dt_read(env, scrub->os_obj, &buf, &pos);
166         /* failure */
167         if (rc < 0) {
168                 CERROR("%s: fail to load scrub file: rc = %d\n",
169                        scrub->os_name, rc);
170                 return rc;
171         }
172
173         /* empty */
174         if (!rc)
175                 return -ENOENT;
176
177         /* corrupted */
178         if (rc < buf.lb_len) {
179                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
180                        "expected = %d: rc = %d\n",
181                        scrub->os_name, (int)buf.lb_len, rc);
182                 return -EFAULT;
183         }
184
185         scrub_file_to_cpu(sf, &scrub->os_file_disk);
186         if (sf->sf_magic != SCRUB_MAGIC_V1) {
187                 CDEBUG(D_LFSCK, "%s: invalid scrub magic 0x%x != 0x%x\n",
188                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V1);
189                 return -EFAULT;
190         }
191
192         return 0;
193 }
194 EXPORT_SYMBOL(scrub_file_load);
195
196 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
197 {
198         struct scrub_file *sf = &scrub->os_file_disk;
199         struct dt_object *obj = scrub->os_obj;
200         struct dt_device *dev = scrub_obj2dev(obj);
201         struct lu_buf buf = {
202                 .lb_buf = sf,
203                 .lb_len = sizeof(*sf)
204         };
205         struct thandle *th;
206         loff_t pos = 0;
207         int rc;
208         ENTRY;
209
210         /* Skip store under rdonly mode. */
211         if (dev->dd_rdonly)
212                 RETURN(0);
213
214         scrub_file_to_le(sf, &scrub->os_file);
215         th = dt_trans_create(env, dev);
216         if (IS_ERR(th))
217                 GOTO(log, rc = PTR_ERR(th));
218
219         rc = dt_declare_record_write(env, obj, &buf, pos, th);
220         if (rc)
221                 GOTO(stop, rc);
222
223         rc = dt_trans_start_local(env, dev, th);
224         if (rc)
225                 GOTO(stop, rc);
226
227         rc = dt_record_write(env, obj, &buf, &pos, th);
228
229         GOTO(stop, rc);
230
231 stop:
232         dt_trans_stop(env, dev, th);
233
234 log:
235         if (rc)
236                 CERROR("%s: store scrub file: rc = %d\n",
237                        scrub->os_name, rc);
238         else
239                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
240                        scrub->os_name, rc);
241
242         scrub->os_time_last_checkpoint = ktime_get_seconds();
243         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
244                                          SCRUB_CHECKPOINT_INTERVAL;
245         return rc;
246 }
247 EXPORT_SYMBOL(scrub_file_store);
248
249 bool scrub_needs_check(struct lustre_scrub *scrub, const struct lu_fid *fid,
250                        u64 index)
251 {
252         bool check = true;
253
254         if (!fid_is_norm(fid) && !fid_is_igif(fid))
255                 check = false;
256         else if (scrub->os_running && scrub->os_pos_current > index)
257                 check = false;
258         else if (scrub->os_auto_scrub_interval == AS_NEVER)
259                 check = false;
260         else if (ktime_get_real_seconds() <
261                  scrub->os_file.sf_time_last_complete +
262                  scrub->os_auto_scrub_interval)
263                 check = false;
264
265         return check;
266 }
267 EXPORT_SYMBOL(scrub_needs_check);
268
269 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
270 {
271         struct scrub_file *sf = &scrub->os_file;
272         time64_t now = ktime_get_seconds();
273         int rc;
274
275         if (likely(now < scrub->os_time_next_checkpoint ||
276                    scrub->os_new_checked == 0))
277                 return 0;
278
279         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
280                scrub->os_name, scrub->os_pos_current);
281
282         down_write(&scrub->os_rwsem);
283         sf->sf_items_checked += scrub->os_new_checked;
284         scrub->os_new_checked = 0;
285         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
286         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
287         sf->sf_run_time += now - scrub->os_time_last_checkpoint;
288         rc = scrub_file_store(env, scrub);
289         up_write(&scrub->os_rwsem);
290
291         return rc;
292 }
293 EXPORT_SYMBOL(scrub_checkpoint);
294
295 int scrub_thread_prep(const struct lu_env *env, struct lustre_scrub *scrub,
296                       uuid_t uuid, u64 start)
297 {
298         struct scrub_file *sf = &scrub->os_file;
299         u32 flags = scrub->os_start_flags;
300         bool drop_dryrun = false;
301         int rc;
302
303         ENTRY;
304         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
305                scrub->os_name, flags);
306
307         down_write(&scrub->os_rwsem);
308         if (flags & SS_SET_FAILOUT)
309                 sf->sf_param |= SP_FAILOUT;
310         else if (flags & SS_CLEAR_FAILOUT)
311                 sf->sf_param &= ~SP_FAILOUT;
312
313         if (flags & SS_SET_DRYRUN) {
314                 sf->sf_param |= SP_DRYRUN;
315         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
316                 sf->sf_param &= ~SP_DRYRUN;
317                 drop_dryrun = true;
318         }
319
320         if (flags & SS_RESET)
321                 scrub_file_reset(scrub, uuid, 0);
322
323         spin_lock(&scrub->os_lock);
324         scrub->os_partial_scan = 0;
325         if (flags & SS_AUTO_FULL) {
326                 scrub->os_full_speed = 1;
327                 sf->sf_flags |= SF_AUTO;
328         } else if (flags & SS_AUTO_PARTIAL) {
329                 scrub->os_full_speed = 0;
330                 scrub->os_partial_scan = 1;
331                 sf->sf_flags |= SF_AUTO;
332         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
333                                    SF_UPGRADE)) {
334                 scrub->os_full_speed = 1;
335         } else {
336                 scrub->os_full_speed = 0;
337         }
338
339         scrub->os_in_prior = 0;
340         scrub->os_waiting = 0;
341         scrub->os_paused = 0;
342         scrub->os_in_join = 0;
343         scrub->os_full_scrub = 0;
344         spin_unlock(&scrub->os_lock);
345         scrub->os_new_checked = 0;
346         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
347                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
348         else if (sf->sf_pos_last_checkpoint != 0)
349                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
350         else
351                 sf->sf_pos_latest_start = start;
352
353         scrub->os_pos_current = sf->sf_pos_latest_start;
354         sf->sf_status = SS_SCANNING;
355         sf->sf_time_latest_start = ktime_get_real_seconds();
356         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
357         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
358         rc = scrub_file_store(env, scrub);
359         if (rc == 0) {
360                 spin_lock(&scrub->os_lock);
361                 scrub->os_running = 1;
362                 spin_unlock(&scrub->os_lock);
363                 wake_up_var(scrub);
364         }
365         up_write(&scrub->os_rwsem);
366
367         RETURN(rc);
368 }
369 EXPORT_SYMBOL(scrub_thread_prep);
370
371 int scrub_thread_post(const struct lu_env *env, struct lustre_scrub *scrub,
372                       int result)
373 {
374         struct scrub_file *sf = &scrub->os_file;
375         int rc;
376         ENTRY;
377
378         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
379                scrub->os_name, result);
380
381         down_write(&scrub->os_rwsem);
382         spin_lock(&scrub->os_lock);
383         scrub->os_running = 0;
384         spin_unlock(&scrub->os_lock);
385         if (scrub->os_new_checked > 0) {
386                 sf->sf_items_checked += scrub->os_new_checked;
387                 scrub->os_new_checked = 0;
388                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
389         }
390         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
391         if (result > 0) {
392                 sf->sf_status = SS_COMPLETED;
393                 if (!(sf->sf_param & SP_DRYRUN)) {
394                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
395                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
396                                           SF_UPGRADE | SF_AUTO);
397                 }
398                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
399                 sf->sf_success_count++;
400         } else if (result == 0) {
401                 if (scrub->os_paused)
402                         sf->sf_status = SS_PAUSED;
403                 else
404                         sf->sf_status = SS_STOPPED;
405         } else {
406                 sf->sf_status = SS_FAILED;
407         }
408         sf->sf_run_time += ktime_get_seconds() -
409                            scrub->os_time_last_checkpoint;
410
411         rc = scrub_file_store(env, scrub);
412         up_write(&scrub->os_rwsem);
413
414         RETURN(rc < 0 ? rc : result);
415 }
416 EXPORT_SYMBOL(scrub_thread_post);
417
418 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
419                 void *data, __u32 flags)
420 {
421         struct task_struct *task;
422         int rc;
423         ENTRY;
424
425         if (scrub->os_task)
426                 RETURN(-EALREADY);
427
428         if (scrub->os_file.sf_status == SS_COMPLETED) {
429                 if (!(flags & SS_SET_FAILOUT))
430                         flags |= SS_CLEAR_FAILOUT;
431
432                 if (!(flags & SS_SET_DRYRUN))
433                         flags |= SS_CLEAR_DRYRUN;
434
435                 flags |= SS_RESET;
436         }
437
438         task = kthread_create(threadfn, data, "OI_scrub");
439         if (IS_ERR(task)) {
440                 rc = PTR_ERR(task);
441                 CERROR("%s: cannot start iteration thread: rc = %d\n",
442                        scrub->os_name, rc);
443                 RETURN(rc);
444         }
445         spin_lock(&scrub->os_lock);
446         if (scrub->os_task) {
447                 /* Lost a race */
448                 spin_unlock(&scrub->os_lock);
449                 kthread_stop(task);
450                 RETURN(-EALREADY);
451         }
452         scrub->os_start_flags = flags;
453         scrub->os_task = task;
454         wake_up_process(task);
455         spin_unlock(&scrub->os_lock);
456         wait_var_event(scrub, scrub->os_running || !scrub->os_task);
457
458         RETURN(0);
459 }
460 EXPORT_SYMBOL(scrub_start);
461
462 void scrub_stop(struct lustre_scrub *scrub)
463 {
464         struct task_struct *task;
465
466         spin_lock(&scrub->os_lock);
467         scrub->os_running = 0;
468         spin_unlock(&scrub->os_lock);
469         task = xchg(&scrub->os_task, NULL);
470         if (task)
471                 kthread_stop(task);
472 }
473 EXPORT_SYMBOL(scrub_stop);
474
475 const char *const scrub_status_names[] = {
476         "init",
477         "scanning",
478         "completed",
479         "failed",
480         "stopped",
481         "paused",
482         "crashed",
483         NULL
484 };
485
486 const char *const scrub_flags_names[] = {
487         "recreated",
488         "inconsistent",
489         "auto",
490         "upgrade",
491         NULL
492 };
493
494 const char *const scrub_param_names[] = {
495         "failout",
496         "dryrun",
497         NULL
498 };
499
500 static void scrub_bits_dump(struct seq_file *m, int bits,
501                             const char *const names[],
502                             const char *prefix)
503 {
504         int flag;
505         int i;
506
507         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
508
509         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
510                 if (flag & bits) {
511                         bits &= ~flag;
512                         seq_printf(m, "%s%c", names[i],
513                                    bits != 0 ? ',' : '\n');
514                 }
515         }
516 }
517
518 static void scrub_time_dump(struct seq_file *m, time64_t time,
519                             const char *prefix)
520 {
521         if (time != 0)
522                 seq_printf(m, "%s: %llu seconds\n", prefix,
523                            ktime_get_real_seconds() - time);
524         else
525                 seq_printf(m, "%s: N/A\n", prefix);
526 }
527
528 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
529 {
530         if (pos != 0)
531                 seq_printf(m, "%s: %llu\n", prefix, pos);
532         else
533                 seq_printf(m, "%s: N/A\n", prefix);
534 }
535
536 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
537 {
538         struct scrub_file *sf = &scrub->os_file;
539         u64 checked;
540         s64 speed;
541
542         down_read(&scrub->os_rwsem);
543         seq_printf(m, "name: OI_scrub\n"
544                    "magic: 0x%x\n"
545                    "oi_files: %d\n"
546                    "status: %s\n",
547                    sf->sf_magic, (int)sf->sf_oi_count,
548                    scrub_status_names[sf->sf_status]);
549
550         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
551
552         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
553
554         scrub_time_dump(m, sf->sf_time_last_complete,
555                         "time_since_last_completed");
556
557         scrub_time_dump(m, sf->sf_time_latest_start,
558                         "time_since_latest_start");
559
560         scrub_time_dump(m, sf->sf_time_last_checkpoint,
561                         "time_since_last_checkpoint");
562
563         scrub_pos_dump(m, sf->sf_pos_latest_start,
564                         "latest_start_position");
565
566         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
567                         "last_checkpoint_position");
568
569         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
570                         "first_failure_position");
571
572         checked = sf->sf_items_checked + scrub->os_new_checked;
573         seq_printf(m, "checked: %llu\n"
574                    "%s: %llu\n"
575                    "failed: %llu\n"
576                    "prior_%s: %llu\n"
577                    "noscrub: %llu\n"
578                    "igif: %llu\n"
579                    "success_count: %u\n",
580                    checked,
581                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
582                    sf->sf_items_updated, sf->sf_items_failed,
583                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
584                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
585                    sf->sf_items_igif, sf->sf_success_count);
586
587         speed = checked;
588         if (scrub->os_running) {
589                 s64 new_checked = scrub->os_new_checked;
590                 time64_t duration;
591                 time64_t rtime;
592
593                 /* Since the time resolution is in seconds for new system
594                  * or small devices it ismore likely that duration will be
595                  * zero which will lead to inaccurate results.
596                  */
597                 duration = ktime_get_seconds() -
598                            scrub->os_time_last_checkpoint;
599                 if (duration != 0)
600                         new_checked = div_s64(new_checked, duration);
601
602                 rtime = sf->sf_run_time + duration;
603                 if (rtime != 0)
604                         speed = div_s64(speed, rtime);
605
606                 seq_printf(m, "run_time: %lld seconds\n"
607                            "average_speed: %lld objects/sec\n"
608                            "real_time_speed: %lld objects/sec\n"
609                            "current_position: %llu\n"
610                            "scrub_in_prior: %s\n"
611                            "scrub_full_speed: %s\n"
612                            "partial_scan: %s\n",
613                            rtime, speed, new_checked,
614                            scrub->os_pos_current,
615                            scrub->os_in_prior ? "yes" : "no",
616                            scrub->os_full_speed ? "yes" : "no",
617                            scrub->os_partial_scan ? "yes" : "no");
618         } else {
619                 if (sf->sf_run_time != 0)
620                         speed = div_s64(speed, sf->sf_run_time);
621                 seq_printf(m, "run_time: %d seconds\n"
622                            "average_speed: %lld objects/sec\n"
623                            "real_time_speed: N/A\n"
624                            "current_position: N/A\n",
625                            sf->sf_run_time, speed);
626         }
627
628         up_read(&scrub->os_rwsem);
629 }
630 EXPORT_SYMBOL(scrub_dump);
631
632 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
633                     const struct lu_fid *cfid, __u64 child,
634                     const char *name, int namelen)
635 {
636         struct lustre_index_restore_unit *liru;
637         int len = sizeof(*liru) + namelen + 1;
638
639         OBD_ALLOC(liru, len);
640         if (!liru)
641                 return -ENOMEM;
642
643         INIT_LIST_HEAD(&liru->liru_link);
644         liru->liru_pfid = *pfid;
645         liru->liru_cfid = *cfid;
646         liru->liru_clid = child;
647         liru->liru_len = len;
648         memcpy(liru->liru_name, name, namelen);
649         liru->liru_name[namelen] = 0;
650         list_add_tail(&liru->liru_link, head);
651
652         return 0;
653 }
654 EXPORT_SYMBOL(lustre_liru_new);
655
656 int lustre_index_register(struct dt_device *dev, const char *devname,
657                           struct list_head *head, spinlock_t *lock, int *guard,
658                           const struct lu_fid *fid,
659                           __u32 keysize, __u32 recsize)
660 {
661         struct lustre_index_backup_unit *libu, *pos;
662         int rc = 0;
663         ENTRY;
664
665         if (dev->dd_rdonly || *guard)
666                 RETURN(1);
667
668         OBD_ALLOC_PTR(libu);
669         if (!libu)
670                 RETURN(-ENOMEM);
671
672         INIT_LIST_HEAD(&libu->libu_link);
673         libu->libu_keysize = keysize;
674         libu->libu_recsize = recsize;
675         libu->libu_fid = *fid;
676
677         spin_lock(lock);
678         if (unlikely(*guard)) {
679                 spin_unlock(lock);
680                 OBD_FREE_PTR(libu);
681
682                 RETURN(1);
683         }
684
685         list_for_each_entry_reverse(pos, head, libu_link) {
686                 rc = lu_fid_cmp(&pos->libu_fid, fid);
687                 if (rc < 0) {
688                         list_add(&libu->libu_link, &pos->libu_link);
689                         spin_unlock(lock);
690
691                         RETURN(0);
692                 }
693
694                 if (!rc) {
695                         /* Registered already. But the former registered one
696                          * has different keysize/recsize. It may because that
697                          * the former values are from disk and corrupted, then
698                          * replace it with new values. */
699                         if (unlikely(keysize != pos->libu_keysize ||
700                                      recsize != pos->libu_recsize)) {
701                                 CWARN("%s: the index "DFID" has registered "
702                                       "with %u/%u, may be invalid, replace "
703                                       "with %u/%u\n",
704                                       devname, PFID(fid), pos->libu_keysize,
705                                       pos->libu_recsize, keysize, recsize);
706
707                                 pos->libu_keysize = keysize;
708                                 pos->libu_recsize = recsize;
709                         } else {
710                                 rc = 1;
711                         }
712
713                         spin_unlock(lock);
714                         OBD_FREE_PTR(libu);
715
716                         RETURN(rc);
717                 }
718         }
719
720         list_add(&libu->libu_link, head);
721         spin_unlock(lock);
722
723         RETURN(0);
724 }
725 EXPORT_SYMBOL(lustre_index_register);
726
727 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
728                                   const struct lu_fid *fid)
729 {
730         struct lustre_index_backup_unit *libu;
731         int rc = -ENOENT;
732
733         spin_lock(lock);
734         list_for_each_entry_reverse(libu, head, libu_link) {
735                 rc = lu_fid_cmp(&libu->libu_fid, fid);
736                 /* NOT registered. */
737                 if (rc < 0)
738                         break;
739
740                 if (!rc) {
741                         list_del(&libu->libu_link);
742                         break;
743                 }
744         }
745         spin_unlock(lock);
746
747         if (!rc)
748                 OBD_FREE_PTR(libu);
749 }
750
751 static void
752 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
753                                 __u32 keysize, __u32 recsize,
754                                 const struct lu_fid *fid, __u32 count)
755 {
756         memset(header, 0, sizeof(*header));
757         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
758         header->libh_count = cpu_to_le32(count);
759         header->libh_keysize = cpu_to_le32(keysize);
760         header->libh_recsize = cpu_to_le32(recsize);
761         fid_cpu_to_le(&header->libh_owner, fid);
762 }
763
764 static int lustre_index_backup_body(const struct lu_env *env,
765                                     struct dt_object *obj, loff_t *pos,
766                                     void *buf, int bufsize)
767 {
768         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
769         struct thandle *th;
770         struct lu_buf lbuf = {
771                 .lb_buf = buf,
772                 .lb_len = bufsize
773         };
774         int rc;
775         ENTRY;
776
777         th = dt_trans_create(env, dev);
778         if (IS_ERR(th))
779                 RETURN(PTR_ERR(th));
780
781         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
782         if (rc)
783                 GOTO(stop, rc);
784
785         rc = dt_trans_start_local(env, dev, th);
786         if (rc)
787                 GOTO(stop, rc);
788
789         rc = dt_record_write(env, obj, &lbuf, pos, th);
790
791         GOTO(stop, rc);
792
793 stop:
794         dt_trans_stop(env, dev, th);
795         return rc;
796 }
797
798 static int lustre_index_backup_header(const struct lu_env *env,
799                                       struct dt_object *obj,
800                                       const struct lu_fid *tgt_fid,
801                                       __u32 keysize, __u32 recsize,
802                                       void *buf, int bufsize, int count)
803 {
804         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
805         struct lustre_index_backup_header *header = buf;
806         struct lu_attr *la = buf;
807         struct thandle *th;
808         struct lu_buf lbuf = {
809                 .lb_buf = header,
810                 .lb_len = sizeof(*header)
811         };
812         loff_t size = sizeof(*header) + (keysize + recsize) * count;
813         loff_t pos = 0;
814         int rc;
815         bool punch = false;
816         ENTRY;
817
818         LASSERT(sizeof(*la) <= bufsize);
819         LASSERT(sizeof(*header) <= bufsize);
820
821         rc = dt_attr_get(env, obj, la);
822         if (rc)
823                 RETURN(rc);
824
825         if (la->la_size > size)
826                 punch = true;
827
828         lustre_index_backup_make_header(header, keysize, recsize,
829                                         tgt_fid, count);
830         th = dt_trans_create(env, dev);
831         if (IS_ERR(th))
832                 RETURN(PTR_ERR(th));
833
834         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
835         if (rc)
836                 GOTO(stop, rc);
837
838         if (punch) {
839                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
840                 if (rc)
841                         GOTO(stop, rc);
842         }
843
844         rc = dt_trans_start_local(env, dev, th);
845         if (rc)
846                 GOTO(stop, rc);
847
848         rc = dt_record_write(env, obj, &lbuf, &pos, th);
849         if (!rc && punch)
850                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
851
852         GOTO(stop, rc);
853
854 stop:
855         dt_trans_stop(env, dev, th);
856         return rc;
857 }
858
859 static int lustre_index_update_lma(const struct lu_env *env,
860                                    struct dt_object *obj,
861                                    void *buf, int bufsize)
862 {
863         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
864         struct lustre_mdt_attrs *lma = buf;
865         struct lu_buf lbuf = {
866                 .lb_buf = lma,
867                 .lb_len = sizeof(struct lustre_ost_attrs)
868         };
869         struct thandle *th;
870         int fl = LU_XATTR_REPLACE;
871         int rc;
872         ENTRY;
873
874         LASSERT(bufsize >= lbuf.lb_len);
875
876         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
877         if (unlikely(rc == -ENODATA)) {
878                 fl = LU_XATTR_CREATE;
879                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
880                                 LMAC_IDX_BACKUP, 0);
881                 rc = sizeof(*lma);
882         } else if (rc < sizeof(*lma)) {
883                 RETURN(rc < 0 ? rc : -EFAULT);
884         } else {
885                 lustre_lma_swab(lma);
886                 if (lma->lma_compat & LMAC_IDX_BACKUP)
887                         RETURN(0);
888
889                 lma->lma_compat |= LMAC_IDX_BACKUP;
890         }
891
892         lustre_lma_swab(lma);
893         lbuf.lb_len = rc;
894         th = dt_trans_create(env, dev);
895         if (IS_ERR(th))
896                 RETURN(rc);
897
898         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
899         if (rc)
900                 GOTO(stop, rc);
901
902         rc = dt_trans_start_local(env, dev, th);
903         if (rc)
904                 GOTO(stop, rc);
905
906         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
907
908         GOTO(stop, rc);
909
910 stop:
911         dt_trans_stop(env, dev, th);
912         return rc;
913 }
914
915 static int lustre_index_backup_one(const struct lu_env *env,
916                                    struct local_oid_storage *los,
917                                    struct dt_object *parent,
918                                    struct lustre_index_backup_unit *libu,
919                                    char *buf, int bufsize)
920 {
921         struct dt_device *dev = scrub_obj2dev(parent);
922         struct dt_object *tgt_obj = NULL;
923         struct dt_object *bak_obj = NULL;
924         const struct dt_it_ops *iops;
925         struct dt_it *di;
926         loff_t pos = sizeof(struct lustre_index_backup_header);
927         int count = 0;
928         int size = 0;
929         int rc;
930         ENTRY;
931
932         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
933                                              &libu->libu_fid, NULL));
934         if (IS_ERR_OR_NULL(tgt_obj))
935                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
936
937         if (!dt_object_exists(tgt_obj))
938                 GOTO(out, rc = 0);
939
940         if (!tgt_obj->do_index_ops) {
941                 struct dt_index_features feat;
942
943                 feat.dif_flags = DT_IND_UPDATE;
944                 feat.dif_keysize_min = libu->libu_keysize;
945                 feat.dif_keysize_max = libu->libu_keysize;
946                 feat.dif_recsize_min = libu->libu_recsize;
947                 feat.dif_recsize_max = libu->libu_recsize;
948                 feat.dif_ptrsize = 4;
949                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
950                 if (rc)
951                         GOTO(out, rc);
952         }
953
954         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
955         bak_obj = local_file_find_or_create(env, los, parent, buf,
956                                             S_IFREG | S_IRUGO | S_IWUSR);
957         if (IS_ERR_OR_NULL(bak_obj))
958                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
959
960         iops = &tgt_obj->do_index_ops->dio_it;
961         di = iops->init(env, tgt_obj, 0);
962         if (IS_ERR(di))
963                 GOTO(out, rc = PTR_ERR(di));
964
965         rc = iops->load(env, di, 0);
966         if (!rc)
967                 rc = iops->next(env, di);
968         else if (rc > 0)
969                 rc = 0;
970
971         while (!rc) {
972                 void *key;
973                 void *rec;
974
975                 key = iops->key(env, di);
976                 memcpy(&buf[size], key, libu->libu_keysize);
977                 size += libu->libu_keysize;
978                 rec = &buf[size];
979                 rc = iops->rec(env, di, rec, 0);
980                 if (rc)
981                         GOTO(fini, rc);
982
983                 size += libu->libu_recsize;
984                 count++;
985                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
986                         rc = lustre_index_backup_body(env, bak_obj, &pos,
987                                                       buf, size);
988                         if (rc)
989                                 GOTO(fini, rc);
990
991                         size = 0;
992                 }
993
994                 rc = iops->next(env, di);
995         }
996
997         if (rc >= 0 && size > 0)
998                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
999
1000         if (rc < 0)
1001                 GOTO(fini, rc);
1002
1003         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
1004                                         libu->libu_keysize, libu->libu_recsize,
1005                                         buf, bufsize, count);
1006         if (!rc)
1007                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
1008
1009         if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
1010                 LASSERT(bufsize >= 512);
1011
1012                 pos = 0;
1013                 memset(buf, 0, 512);
1014                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
1015         }
1016
1017         GOTO(fini, rc);
1018
1019 fini:
1020         iops->fini(env, di);
1021 out:
1022         if (!IS_ERR_OR_NULL(tgt_obj))
1023                 dt_object_put_nocache(env, tgt_obj);
1024         if (!IS_ERR_OR_NULL(bak_obj))
1025                 dt_object_put_nocache(env, bak_obj);
1026         return rc;
1027 }
1028
1029 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
1030                          const char *devname, struct list_head *head,
1031                          spinlock_t *lock, int *guard, bool backup)
1032 {
1033         struct lustre_index_backup_unit *libu;
1034         struct local_oid_storage *los = NULL;
1035         struct dt_object *parent = NULL;
1036         char *buf = NULL;
1037         struct lu_fid fid;
1038         int rc;
1039         ENTRY;
1040
1041         if (dev->dd_rdonly || *guard)
1042                 RETURN_EXIT;
1043
1044         spin_lock(lock);
1045         *guard = 1;
1046         spin_unlock(lock);
1047
1048         if (list_empty(head))
1049                 RETURN_EXIT;
1050
1051         /* Handle kinds of failures during mount process. */
1052         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
1053                 backup = false;
1054
1055         if (backup) {
1056                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1057                 if (!buf) {
1058                         backup = false;
1059                         goto scan;
1060                 }
1061
1062                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
1063                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1064                                                     &fid, NULL));
1065                 if (IS_ERR_OR_NULL(parent)) {
1066                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
1067                                devname, parent ? PTR_ERR(parent) : -ENOENT);
1068                         backup = false;
1069                         goto scan;
1070                 }
1071
1072                 lu_local_name_obj_fid(&fid, 1);
1073                 rc = local_oid_storage_init(env, dev, &fid, &los);
1074                 if (rc) {
1075                         CERROR("%s: failed to init local storage: rc = %d\n",
1076                                devname, rc);
1077                         backup = false;
1078                 }
1079         }
1080
1081 scan:
1082         spin_lock(lock);
1083         while (!list_empty(head)) {
1084                 libu = list_entry(head->next,
1085                                   struct lustre_index_backup_unit, libu_link);
1086                 list_del_init(&libu->libu_link);
1087                 spin_unlock(lock);
1088
1089                 if (backup) {
1090                         rc = lustre_index_backup_one(env, los, parent, libu,
1091                                                      buf, INDEX_BACKUP_BUFSIZE);
1092                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
1093                                devname, PFID(&libu->libu_fid), rc);
1094                 }
1095
1096                 OBD_FREE_PTR(libu);
1097                 spin_lock(lock);
1098         }
1099         spin_unlock(lock);
1100
1101         if (los)
1102                 local_oid_storage_fini(env, los);
1103         if (parent)
1104                 dt_object_put_nocache(env, parent);
1105         if (buf)
1106                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1107
1108         EXIT;
1109 }
1110 EXPORT_SYMBOL(lustre_index_backup);
1111
1112 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
1113                          const struct lu_fid *parent_fid,
1114                          const struct lu_fid *tgt_fid,
1115                          const struct lu_fid *bak_fid, const char *name,
1116                          struct list_head *head, spinlock_t *lock,
1117                          char *buf, int bufsize)
1118 {
1119         struct dt_object *parent_obj = NULL;
1120         struct dt_object *tgt_obj = NULL;
1121         struct dt_object *bak_obj = NULL;
1122         struct lustre_index_backup_header *header;
1123         struct dt_index_features *feat;
1124         struct dt_object_format *dof;
1125         struct lu_attr *la;
1126         struct thandle *th;
1127         struct lu_object_conf conf;
1128         struct dt_insert_rec ent;
1129         struct lu_buf lbuf;
1130         struct lu_fid tfid;
1131         loff_t pos = 0;
1132         __u32 keysize;
1133         __u32 recsize;
1134         __u32 pairsize;
1135         int count;
1136         int rc;
1137         bool registered = false;
1138         ENTRY;
1139
1140         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1141                 sizeof(*feat) + sizeof(*header));
1142
1143         memset(buf, 0, bufsize);
1144         la = (struct lu_attr *)buf;
1145         dof = (void *)la + sizeof(*la);
1146         feat = (void *)dof + sizeof(*dof);
1147         header = (void *)feat + sizeof(*feat);
1148         lbuf.lb_buf = header;
1149         lbuf.lb_len = sizeof(*header);
1150
1151         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1152                                              tgt_fid, NULL));
1153         if (IS_ERR_OR_NULL(tgt_obj))
1154                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1155
1156         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1157                                              bak_fid, NULL));
1158         if (IS_ERR_OR_NULL(bak_obj))
1159                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1160
1161         if (!dt_object_exists(bak_obj))
1162                 GOTO(out, rc = -ENOENT);
1163
1164         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1165                                                 parent_fid, NULL));
1166         if (IS_ERR_OR_NULL(parent_obj))
1167                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1168
1169         LASSERT(dt_object_exists(parent_obj));
1170
1171         if (unlikely(!dt_try_as_dir(env, parent_obj)))
1172                 GOTO(out, rc = -ENOTDIR);
1173
1174         rc = dt_attr_get(env, tgt_obj, la);
1175         if (rc)
1176                 GOTO(out, rc);
1177
1178         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1179         if (rc)
1180                 GOTO(out, rc);
1181
1182         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1183                 GOTO(out, rc = -EINVAL);
1184
1185         fid_le_to_cpu(&tfid, &header->libh_owner);
1186         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1187                 GOTO(out, rc = -EINVAL);
1188
1189         keysize = le32_to_cpu(header->libh_keysize);
1190         recsize = le32_to_cpu(header->libh_recsize);
1191         pairsize = keysize + recsize;
1192
1193         memset(feat, 0, sizeof(*feat));
1194         feat->dif_flags = DT_IND_UPDATE;
1195         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1196         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1197         feat->dif_ptrsize = 4;
1198
1199         /* T1: remove old name entry and destroy old index. */
1200         th = dt_trans_create(env, dev);
1201         if (IS_ERR(th))
1202                 GOTO(out, rc = PTR_ERR(th));
1203
1204         rc = dt_declare_delete(env, parent_obj,
1205                                (const struct dt_key *)name, th);
1206         if (rc)
1207                 GOTO(stop, rc);
1208
1209         rc = dt_declare_ref_del(env, tgt_obj, th);
1210         if (rc)
1211                 GOTO(stop, rc);
1212
1213         rc = dt_declare_destroy(env, tgt_obj, th);
1214         if (rc)
1215                 GOTO(stop, rc);
1216
1217         rc = dt_trans_start_local(env, dev, th);
1218         if (rc)
1219                 GOTO(stop, rc);
1220
1221         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1222         if (rc)
1223                 GOTO(stop, rc);
1224
1225         dt_write_lock(env, tgt_obj, 0);
1226         rc = dt_ref_del(env, tgt_obj, th);
1227         if (rc == 0) {
1228                 if (S_ISDIR(tgt_obj->do_lu.lo_header->loh_attr))
1229                         dt_ref_del(env, tgt_obj, th);
1230                 rc = dt_destroy(env, tgt_obj, th);
1231         }
1232         dt_write_unlock(env, tgt_obj);
1233         dt_trans_stop(env, dev, th);
1234         if (rc)
1235                 GOTO(out, rc);
1236
1237         la->la_valid = LA_MODE | LA_UID | LA_GID;
1238         conf.loc_flags = LOC_F_NEW;
1239         dof->u.dof_idx.di_feat = feat;
1240         dof->dof_type = DFT_INDEX;
1241         ent.rec_type = S_IFREG;
1242         ent.rec_fid = tgt_fid;
1243
1244         /* Drop cache before re-create it. */
1245         dt_object_put_nocache(env, tgt_obj);
1246         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1247                                              tgt_fid, &conf));
1248         if (IS_ERR_OR_NULL(tgt_obj))
1249                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1250
1251         LASSERT(!dt_object_exists(tgt_obj));
1252
1253         /* T2: create new index and insert new name entry. */
1254         th = dt_trans_create(env, dev);
1255         if (IS_ERR(th))
1256                 GOTO(out, rc = PTR_ERR(th));
1257
1258         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1259         if (rc)
1260                 GOTO(stop, rc);
1261
1262         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1263                                (const struct dt_key *)name, th);
1264         if (rc)
1265                 GOTO(stop, rc);
1266
1267         rc = dt_trans_start_local(env, dev, th);
1268         if (rc)
1269                 GOTO(stop, rc);
1270
1271         dt_write_lock(env, tgt_obj, 0);
1272         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1273         dt_write_unlock(env, tgt_obj);
1274         if (rc)
1275                 GOTO(stop, rc);
1276
1277         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1278                        (const struct dt_key *)name, th);
1279         dt_trans_stop(env, dev, th);
1280         /* Some index name may has been inserted by OSD
1281          * automatically when create the index object. */
1282         if (unlikely(rc == -EEXIST))
1283                 rc = 0;
1284         if (rc)
1285                 GOTO(out, rc);
1286
1287         /* The new index will register via index_try. */
1288         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1289         if (rc)
1290                 GOTO(out, rc);
1291
1292         registered = true;
1293         count = le32_to_cpu(header->libh_count);
1294         while (!rc && count > 0) {
1295                 int size = pairsize * count;
1296                 int items = count;
1297                 int i;
1298
1299                 if (size > bufsize) {
1300                         items = bufsize / pairsize;
1301                         size = pairsize * items;
1302                 }
1303
1304                 lbuf.lb_buf = buf;
1305                 lbuf.lb_len = size;
1306                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1307                 for (i = 0; i < items && !rc; i++) {
1308                         void *key = &buf[i * pairsize];
1309                         void *rec = &buf[i * pairsize + keysize];
1310
1311                         /* Tn: restore the records. */
1312                         th = dt_trans_create(env, dev);
1313                         if (!th)
1314                                 GOTO(out, rc = -ENOMEM);
1315
1316                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1317                         if (rc)
1318                                 GOTO(stop, rc);
1319
1320                         rc = dt_trans_start_local(env, dev, th);
1321                         if (rc)
1322                                 GOTO(stop, rc);
1323
1324                         rc = dt_insert(env, tgt_obj, rec, key, th);
1325                         if (unlikely(rc == -EEXIST))
1326                                 rc = 0;
1327
1328                         dt_trans_stop(env, dev, th);
1329                 }
1330
1331                 count -= items;
1332         }
1333
1334         GOTO(out, rc);
1335
1336 stop:
1337         dt_trans_stop(env, dev, th);
1338         if (rc && registered)
1339                 /* Degister the index to avoid overwriting the backup. */
1340                 lustre_index_degister(head, lock, tgt_fid);
1341
1342 out:
1343         if (!IS_ERR_OR_NULL(tgt_obj))
1344                 dt_object_put_nocache(env, tgt_obj);
1345         if (!IS_ERR_OR_NULL(bak_obj))
1346                 dt_object_put_nocache(env, bak_obj);
1347         if (!IS_ERR_OR_NULL(parent_obj))
1348                 dt_object_put_nocache(env, parent_obj);
1349         return rc;
1350 }
1351 EXPORT_SYMBOL(lustre_index_restore);