Whamcloud - gitweb
LU-14989 sec: access to enc file's xattrs
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of_safe(obj->do_lu.lo_dev, struct dt_device,
45                                  dd_lu_dev);
46 }
47
48 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 {
50         uuid_copy(&des->sf_uuid, &src->sf_uuid);
51         des->sf_flags   = le64_to_cpu(src->sf_flags);
52         des->sf_magic   = le32_to_cpu(src->sf_magic);
53         des->sf_status  = le16_to_cpu(src->sf_status);
54         des->sf_param   = le16_to_cpu(src->sf_param);
55         des->sf_time_last_complete      =
56                                 le64_to_cpu(src->sf_time_last_complete);
57         des->sf_time_latest_start       =
58                                 le64_to_cpu(src->sf_time_latest_start);
59         des->sf_time_last_checkpoint    =
60                                 le64_to_cpu(src->sf_time_last_checkpoint);
61         des->sf_pos_latest_start        =
62                                 le64_to_cpu(src->sf_pos_latest_start);
63         des->sf_pos_last_checkpoint     =
64                                 le64_to_cpu(src->sf_pos_last_checkpoint);
65         des->sf_pos_first_inconsistent  =
66                                 le64_to_cpu(src->sf_pos_first_inconsistent);
67         des->sf_items_checked           =
68                                 le64_to_cpu(src->sf_items_checked);
69         des->sf_items_updated           =
70                                 le64_to_cpu(src->sf_items_updated);
71         des->sf_items_failed            =
72                                 le64_to_cpu(src->sf_items_failed);
73         des->sf_items_updated_prior     =
74                                 le64_to_cpu(src->sf_items_updated_prior);
75         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
76         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
77         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
78         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
79         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
80 }
81
82 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 {
84         uuid_copy(&des->sf_uuid, &src->sf_uuid);
85         des->sf_flags   = cpu_to_le64(src->sf_flags);
86         des->sf_magic   = cpu_to_le32(src->sf_magic);
87         des->sf_status  = cpu_to_le16(src->sf_status);
88         des->sf_param   = cpu_to_le16(src->sf_param);
89         des->sf_time_last_complete      =
90                                 cpu_to_le64(src->sf_time_last_complete);
91         des->sf_time_latest_start       =
92                                 cpu_to_le64(src->sf_time_latest_start);
93         des->sf_time_last_checkpoint    =
94                                 cpu_to_le64(src->sf_time_last_checkpoint);
95         des->sf_pos_latest_start        =
96                                 cpu_to_le64(src->sf_pos_latest_start);
97         des->sf_pos_last_checkpoint     =
98                                 cpu_to_le64(src->sf_pos_last_checkpoint);
99         des->sf_pos_first_inconsistent  =
100                                 cpu_to_le64(src->sf_pos_first_inconsistent);
101         des->sf_items_checked           =
102                                 cpu_to_le64(src->sf_items_checked);
103         des->sf_items_updated           =
104                                 cpu_to_le64(src->sf_items_updated);
105         des->sf_items_failed            =
106                                 cpu_to_le64(src->sf_items_failed);
107         des->sf_items_updated_prior     =
108                                 cpu_to_le64(src->sf_items_updated_prior);
109         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
110         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
111         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
112         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
113         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
114 }
115
116 void scrub_file_init(struct lustre_scrub *scrub, uuid_t uuid)
117 {
118         struct scrub_file *sf = &scrub->os_file;
119
120         memset(sf, 0, sizeof(*sf));
121         uuid_copy(&sf->sf_uuid, &uuid);
122         sf->sf_magic = SCRUB_MAGIC_V1;
123         sf->sf_status = SS_INIT;
124 }
125 EXPORT_SYMBOL(scrub_file_init);
126
127 void scrub_file_reset(struct lustre_scrub *scrub, uuid_t uuid, u64 flags)
128 {
129         struct scrub_file *sf = &scrub->os_file;
130
131         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
132                "%#llx, add flags = %#llx\n",
133                scrub->os_name, sf->sf_flags, flags);
134
135         uuid_copy(&sf->sf_uuid, &uuid);
136         sf->sf_status = SS_INIT;
137         sf->sf_flags |= flags;
138         sf->sf_flags &= ~SF_AUTO;
139         sf->sf_run_time = 0;
140         sf->sf_time_latest_start = 0;
141         sf->sf_time_last_checkpoint = 0;
142         sf->sf_pos_latest_start = 0;
143         sf->sf_pos_last_checkpoint = 0;
144         sf->sf_pos_first_inconsistent = 0;
145         sf->sf_items_checked = 0;
146         sf->sf_items_updated = 0;
147         sf->sf_items_failed = 0;
148         sf->sf_items_noscrub = 0;
149         sf->sf_items_igif = 0;
150         if (!scrub->os_in_join)
151                 sf->sf_items_updated_prior = 0;
152 }
153 EXPORT_SYMBOL(scrub_file_reset);
154
155 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
156 {
157         struct scrub_file *sf = &scrub->os_file;
158         struct lu_buf buf = {
159                 .lb_buf = &scrub->os_file_disk,
160                 .lb_len = sizeof(scrub->os_file_disk)
161         };
162         loff_t pos = 0;
163         int rc;
164
165         rc = dt_read(env, scrub->os_obj, &buf, &pos);
166         /* failure */
167         if (rc < 0) {
168                 CERROR("%s: fail to load scrub file: rc = %d\n",
169                        scrub->os_name, rc);
170                 return rc;
171         }
172
173         /* empty */
174         if (!rc)
175                 return -ENOENT;
176
177         /* corrupted */
178         if (rc < buf.lb_len) {
179                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
180                        "expected = %d: rc = %d\n",
181                        scrub->os_name, (int)buf.lb_len, rc);
182                 return -EFAULT;
183         }
184
185         scrub_file_to_cpu(sf, &scrub->os_file_disk);
186         if (sf->sf_magic != SCRUB_MAGIC_V1) {
187                 CDEBUG(D_LFSCK, "%s: invalid scrub magic 0x%x != 0x%x\n",
188                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V1);
189                 return -EFAULT;
190         }
191
192         return 0;
193 }
194 EXPORT_SYMBOL(scrub_file_load);
195
196 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
197 {
198         struct scrub_file *sf = &scrub->os_file_disk;
199         struct dt_object *obj = scrub->os_obj;
200         struct dt_device *dev = scrub_obj2dev(obj);
201         struct lu_buf buf = {
202                 .lb_buf = sf,
203                 .lb_len = sizeof(*sf)
204         };
205         struct thandle *th;
206         loff_t pos = 0;
207         int rc;
208         ENTRY;
209
210         /* Skip store under rdonly mode. */
211         if (dev->dd_rdonly)
212                 RETURN(0);
213
214         scrub_file_to_le(sf, &scrub->os_file);
215         th = dt_trans_create(env, dev);
216         if (IS_ERR(th))
217                 GOTO(log, rc = PTR_ERR(th));
218
219         rc = dt_declare_record_write(env, obj, &buf, pos, th);
220         if (rc)
221                 GOTO(stop, rc);
222
223         rc = dt_trans_start_local(env, dev, th);
224         if (rc)
225                 GOTO(stop, rc);
226
227         rc = dt_record_write(env, obj, &buf, &pos, th);
228
229         GOTO(stop, rc);
230
231 stop:
232         dt_trans_stop(env, dev, th);
233
234 log:
235         if (rc)
236                 CERROR("%s: store scrub file: rc = %d\n",
237                        scrub->os_name, rc);
238         else
239                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
240                        scrub->os_name, rc);
241
242         scrub->os_time_last_checkpoint = ktime_get_seconds();
243         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
244                                          SCRUB_CHECKPOINT_INTERVAL;
245         return rc;
246 }
247 EXPORT_SYMBOL(scrub_file_store);
248
249 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
250 {
251         struct scrub_file *sf = &scrub->os_file;
252         time64_t now = ktime_get_seconds();
253         int rc;
254
255         if (likely(now < scrub->os_time_next_checkpoint ||
256                    scrub->os_new_checked == 0))
257                 return 0;
258
259         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
260                scrub->os_name, scrub->os_pos_current);
261
262         down_write(&scrub->os_rwsem);
263         sf->sf_items_checked += scrub->os_new_checked;
264         scrub->os_new_checked = 0;
265         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
266         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
267         sf->sf_run_time += now - scrub->os_time_last_checkpoint;
268         rc = scrub_file_store(env, scrub);
269         up_write(&scrub->os_rwsem);
270
271         return rc;
272 }
273 EXPORT_SYMBOL(scrub_checkpoint);
274
275 int scrub_thread_prep(const struct lu_env *env, struct lustre_scrub *scrub,
276                       uuid_t uuid, u64 start)
277 {
278         struct scrub_file *sf = &scrub->os_file;
279         u32 flags = scrub->os_start_flags;
280         bool drop_dryrun = false;
281         int rc;
282
283         ENTRY;
284         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
285                scrub->os_name, flags);
286
287         down_write(&scrub->os_rwsem);
288         if (flags & SS_SET_FAILOUT)
289                 sf->sf_param |= SP_FAILOUT;
290         else if (flags & SS_CLEAR_FAILOUT)
291                 sf->sf_param &= ~SP_FAILOUT;
292
293         if (flags & SS_SET_DRYRUN) {
294                 sf->sf_param |= SP_DRYRUN;
295         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
296                 sf->sf_param &= ~SP_DRYRUN;
297                 drop_dryrun = true;
298         }
299
300         if (flags & SS_RESET)
301                 scrub_file_reset(scrub, uuid, 0);
302
303         spin_lock(&scrub->os_lock);
304         scrub->os_partial_scan = 0;
305         if (flags & SS_AUTO_FULL) {
306                 scrub->os_full_speed = 1;
307                 sf->sf_flags |= SF_AUTO;
308         } else if (flags & SS_AUTO_PARTIAL) {
309                 scrub->os_full_speed = 0;
310                 scrub->os_partial_scan = 1;
311                 sf->sf_flags |= SF_AUTO;
312         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
313                                    SF_UPGRADE)) {
314                 scrub->os_full_speed = 1;
315         } else {
316                 scrub->os_full_speed = 0;
317         }
318
319         scrub->os_in_prior = 0;
320         scrub->os_waiting = 0;
321         scrub->os_paused = 0;
322         scrub->os_in_join = 0;
323         scrub->os_full_scrub = 0;
324         spin_unlock(&scrub->os_lock);
325         scrub->os_new_checked = 0;
326         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
327                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
328         else if (sf->sf_pos_last_checkpoint != 0)
329                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
330         else
331                 sf->sf_pos_latest_start = start;
332
333         scrub->os_pos_current = sf->sf_pos_latest_start;
334         sf->sf_status = SS_SCANNING;
335         sf->sf_time_latest_start = ktime_get_real_seconds();
336         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
337         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
338         rc = scrub_file_store(env, scrub);
339         if (rc == 0) {
340                 spin_lock(&scrub->os_lock);
341                 scrub->os_running = 1;
342                 spin_unlock(&scrub->os_lock);
343                 wake_up_var(scrub);
344         }
345         up_write(&scrub->os_rwsem);
346
347         RETURN(rc);
348 }
349 EXPORT_SYMBOL(scrub_thread_prep);
350
351 int scrub_thread_post(const struct lu_env *env, struct lustre_scrub *scrub,
352                       int result)
353 {
354         struct scrub_file *sf = &scrub->os_file;
355         int rc;
356         ENTRY;
357
358         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
359                scrub->os_name, result);
360
361         down_write(&scrub->os_rwsem);
362         spin_lock(&scrub->os_lock);
363         scrub->os_running = 0;
364         spin_unlock(&scrub->os_lock);
365         if (scrub->os_new_checked > 0) {
366                 sf->sf_items_checked += scrub->os_new_checked;
367                 scrub->os_new_checked = 0;
368                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
369         }
370         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
371         if (result > 0) {
372                 sf->sf_status = SS_COMPLETED;
373                 if (!(sf->sf_param & SP_DRYRUN)) {
374                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
375                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
376                                           SF_UPGRADE | SF_AUTO);
377                 }
378                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
379                 sf->sf_success_count++;
380         } else if (result == 0) {
381                 if (scrub->os_paused)
382                         sf->sf_status = SS_PAUSED;
383                 else
384                         sf->sf_status = SS_STOPPED;
385         } else {
386                 sf->sf_status = SS_FAILED;
387         }
388         sf->sf_run_time += ktime_get_seconds() -
389                            scrub->os_time_last_checkpoint;
390
391         rc = scrub_file_store(env, scrub);
392         up_write(&scrub->os_rwsem);
393
394         RETURN(rc < 0 ? rc : result);
395 }
396 EXPORT_SYMBOL(scrub_thread_post);
397
398 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
399                 void *data, __u32 flags)
400 {
401         struct task_struct *task;
402         int rc;
403         ENTRY;
404
405         if (scrub->os_task)
406                 RETURN(-EALREADY);
407
408         if (scrub->os_file.sf_status == SS_COMPLETED) {
409                 if (!(flags & SS_SET_FAILOUT))
410                         flags |= SS_CLEAR_FAILOUT;
411
412                 if (!(flags & SS_SET_DRYRUN))
413                         flags |= SS_CLEAR_DRYRUN;
414
415                 flags |= SS_RESET;
416         }
417
418         task = kthread_create(threadfn, data, "OI_scrub");
419         if (IS_ERR(task)) {
420                 rc = PTR_ERR(task);
421                 CERROR("%s: cannot start iteration thread: rc = %d\n",
422                        scrub->os_name, rc);
423                 RETURN(rc);
424         }
425         spin_lock(&scrub->os_lock);
426         if (scrub->os_task) {
427                 /* Lost a race */
428                 spin_unlock(&scrub->os_lock);
429                 kthread_stop(task);
430                 RETURN(-EALREADY);
431         }
432         scrub->os_start_flags = flags;
433         scrub->os_task = task;
434         wake_up_process(task);
435         spin_unlock(&scrub->os_lock);
436         wait_var_event(scrub, scrub->os_running || !scrub->os_task);
437
438         RETURN(0);
439 }
440 EXPORT_SYMBOL(scrub_start);
441
442 void scrub_stop(struct lustre_scrub *scrub)
443 {
444         struct task_struct *task;
445
446         spin_lock(&scrub->os_lock);
447         scrub->os_running = 0;
448         spin_unlock(&scrub->os_lock);
449         task = xchg(&scrub->os_task, NULL);
450         if (task)
451                 kthread_stop(task);
452 }
453 EXPORT_SYMBOL(scrub_stop);
454
455 const char *const scrub_status_names[] = {
456         "init",
457         "scanning",
458         "completed",
459         "failed",
460         "stopped",
461         "paused",
462         "crashed",
463         NULL
464 };
465
466 const char *const scrub_flags_names[] = {
467         "recreated",
468         "inconsistent",
469         "auto",
470         "upgrade",
471         NULL
472 };
473
474 const char *const scrub_param_names[] = {
475         "failout",
476         "dryrun",
477         NULL
478 };
479
480 static void scrub_bits_dump(struct seq_file *m, int bits,
481                             const char *const names[],
482                             const char *prefix)
483 {
484         int flag;
485         int i;
486
487         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
488
489         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
490                 if (flag & bits) {
491                         bits &= ~flag;
492                         seq_printf(m, "%s%c", names[i],
493                                    bits != 0 ? ',' : '\n');
494                 }
495         }
496 }
497
498 static void scrub_time_dump(struct seq_file *m, time64_t time,
499                             const char *prefix)
500 {
501         if (time != 0)
502                 seq_printf(m, "%s: %llu seconds\n", prefix,
503                            ktime_get_real_seconds() - time);
504         else
505                 seq_printf(m, "%s: N/A\n", prefix);
506 }
507
508 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
509 {
510         if (pos != 0)
511                 seq_printf(m, "%s: %llu\n", prefix, pos);
512         else
513                 seq_printf(m, "%s: N/A\n", prefix);
514 }
515
516 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
517 {
518         struct scrub_file *sf = &scrub->os_file;
519         u64 checked;
520         s64 speed;
521
522         down_read(&scrub->os_rwsem);
523         seq_printf(m, "name: OI_scrub\n"
524                    "magic: 0x%x\n"
525                    "oi_files: %d\n"
526                    "status: %s\n",
527                    sf->sf_magic, (int)sf->sf_oi_count,
528                    scrub_status_names[sf->sf_status]);
529
530         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
531
532         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
533
534         scrub_time_dump(m, sf->sf_time_last_complete,
535                         "time_since_last_completed");
536
537         scrub_time_dump(m, sf->sf_time_latest_start,
538                         "time_since_latest_start");
539
540         scrub_time_dump(m, sf->sf_time_last_checkpoint,
541                         "time_since_last_checkpoint");
542
543         scrub_pos_dump(m, sf->sf_pos_latest_start,
544                         "latest_start_position");
545
546         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
547                         "last_checkpoint_position");
548
549         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
550                         "first_failure_position");
551
552         checked = sf->sf_items_checked + scrub->os_new_checked;
553         seq_printf(m, "checked: %llu\n"
554                    "%s: %llu\n"
555                    "failed: %llu\n"
556                    "prior_%s: %llu\n"
557                    "noscrub: %llu\n"
558                    "igif: %llu\n"
559                    "success_count: %u\n",
560                    checked,
561                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
562                    sf->sf_items_updated, sf->sf_items_failed,
563                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
564                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
565                    sf->sf_items_igif, sf->sf_success_count);
566
567         speed = checked;
568         if (scrub->os_running) {
569                 s64 new_checked = scrub->os_new_checked;
570                 time64_t duration;
571                 time64_t rtime;
572
573                 /* Since the time resolution is in seconds for new system
574                  * or small devices it ismore likely that duration will be
575                  * zero which will lead to inaccurate results.
576                  */
577                 duration = ktime_get_seconds() -
578                            scrub->os_time_last_checkpoint;
579                 if (duration != 0)
580                         new_checked = div_s64(new_checked, duration);
581
582                 rtime = sf->sf_run_time + duration;
583                 if (rtime != 0)
584                         speed = div_s64(speed, rtime);
585
586                 seq_printf(m, "run_time: %lld seconds\n"
587                            "average_speed: %lld objects/sec\n"
588                            "real_time_speed: %lld objects/sec\n"
589                            "current_position: %llu\n"
590                            "scrub_in_prior: %s\n"
591                            "scrub_full_speed: %s\n"
592                            "partial_scan: %s\n",
593                            rtime, speed, new_checked,
594                            scrub->os_pos_current,
595                            scrub->os_in_prior ? "yes" : "no",
596                            scrub->os_full_speed ? "yes" : "no",
597                            scrub->os_partial_scan ? "yes" : "no");
598         } else {
599                 if (sf->sf_run_time != 0)
600                         speed = div_s64(speed, sf->sf_run_time);
601                 seq_printf(m, "run_time: %d seconds\n"
602                            "average_speed: %lld objects/sec\n"
603                            "real_time_speed: N/A\n"
604                            "current_position: N/A\n",
605                            sf->sf_run_time, speed);
606         }
607
608         up_read(&scrub->os_rwsem);
609 }
610 EXPORT_SYMBOL(scrub_dump);
611
612 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
613                     const struct lu_fid *cfid, __u64 child,
614                     const char *name, int namelen)
615 {
616         struct lustre_index_restore_unit *liru;
617         int len = sizeof(*liru) + namelen + 1;
618
619         OBD_ALLOC(liru, len);
620         if (!liru)
621                 return -ENOMEM;
622
623         INIT_LIST_HEAD(&liru->liru_link);
624         liru->liru_pfid = *pfid;
625         liru->liru_cfid = *cfid;
626         liru->liru_clid = child;
627         liru->liru_len = len;
628         memcpy(liru->liru_name, name, namelen);
629         liru->liru_name[namelen] = 0;
630         list_add_tail(&liru->liru_link, head);
631
632         return 0;
633 }
634 EXPORT_SYMBOL(lustre_liru_new);
635
636 int lustre_index_register(struct dt_device *dev, const char *devname,
637                           struct list_head *head, spinlock_t *lock, int *guard,
638                           const struct lu_fid *fid,
639                           __u32 keysize, __u32 recsize)
640 {
641         struct lustre_index_backup_unit *libu, *pos;
642         int rc = 0;
643         ENTRY;
644
645         if (dev->dd_rdonly || *guard)
646                 RETURN(1);
647
648         OBD_ALLOC_PTR(libu);
649         if (!libu)
650                 RETURN(-ENOMEM);
651
652         INIT_LIST_HEAD(&libu->libu_link);
653         libu->libu_keysize = keysize;
654         libu->libu_recsize = recsize;
655         libu->libu_fid = *fid;
656
657         spin_lock(lock);
658         if (unlikely(*guard)) {
659                 spin_unlock(lock);
660                 OBD_FREE_PTR(libu);
661
662                 RETURN(1);
663         }
664
665         list_for_each_entry_reverse(pos, head, libu_link) {
666                 rc = lu_fid_cmp(&pos->libu_fid, fid);
667                 if (rc < 0) {
668                         list_add(&libu->libu_link, &pos->libu_link);
669                         spin_unlock(lock);
670
671                         RETURN(0);
672                 }
673
674                 if (!rc) {
675                         /* Registered already. But the former registered one
676                          * has different keysize/recsize. It may because that
677                          * the former values are from disk and corrupted, then
678                          * replace it with new values. */
679                         if (unlikely(keysize != pos->libu_keysize ||
680                                      recsize != pos->libu_recsize)) {
681                                 CWARN("%s: the index "DFID" has registered "
682                                       "with %u/%u, may be invalid, replace "
683                                       "with %u/%u\n",
684                                       devname, PFID(fid), pos->libu_keysize,
685                                       pos->libu_recsize, keysize, recsize);
686
687                                 pos->libu_keysize = keysize;
688                                 pos->libu_recsize = recsize;
689                         } else {
690                                 rc = 1;
691                         }
692
693                         spin_unlock(lock);
694                         OBD_FREE_PTR(libu);
695
696                         RETURN(rc);
697                 }
698         }
699
700         list_add(&libu->libu_link, head);
701         spin_unlock(lock);
702
703         RETURN(0);
704 }
705 EXPORT_SYMBOL(lustre_index_register);
706
707 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
708                                   const struct lu_fid *fid)
709 {
710         struct lustre_index_backup_unit *libu;
711         int rc = -ENOENT;
712
713         spin_lock(lock);
714         list_for_each_entry_reverse(libu, head, libu_link) {
715                 rc = lu_fid_cmp(&libu->libu_fid, fid);
716                 /* NOT registered. */
717                 if (rc < 0)
718                         break;
719
720                 if (!rc) {
721                         list_del(&libu->libu_link);
722                         break;
723                 }
724         }
725         spin_unlock(lock);
726
727         if (!rc)
728                 OBD_FREE_PTR(libu);
729 }
730
731 static void
732 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
733                                 __u32 keysize, __u32 recsize,
734                                 const struct lu_fid *fid, __u32 count)
735 {
736         memset(header, 0, sizeof(*header));
737         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
738         header->libh_count = cpu_to_le32(count);
739         header->libh_keysize = cpu_to_le32(keysize);
740         header->libh_recsize = cpu_to_le32(recsize);
741         fid_cpu_to_le(&header->libh_owner, fid);
742 }
743
744 static int lustre_index_backup_body(const struct lu_env *env,
745                                     struct dt_object *obj, loff_t *pos,
746                                     void *buf, int bufsize)
747 {
748         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
749         struct thandle *th;
750         struct lu_buf lbuf = {
751                 .lb_buf = buf,
752                 .lb_len = bufsize
753         };
754         int rc;
755         ENTRY;
756
757         th = dt_trans_create(env, dev);
758         if (IS_ERR(th))
759                 RETURN(PTR_ERR(th));
760
761         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
762         if (rc)
763                 GOTO(stop, rc);
764
765         rc = dt_trans_start_local(env, dev, th);
766         if (rc)
767                 GOTO(stop, rc);
768
769         rc = dt_record_write(env, obj, &lbuf, pos, th);
770
771         GOTO(stop, rc);
772
773 stop:
774         dt_trans_stop(env, dev, th);
775         return rc;
776 }
777
778 static int lustre_index_backup_header(const struct lu_env *env,
779                                       struct dt_object *obj,
780                                       const struct lu_fid *tgt_fid,
781                                       __u32 keysize, __u32 recsize,
782                                       void *buf, int bufsize, int count)
783 {
784         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
785         struct lustre_index_backup_header *header = buf;
786         struct lu_attr *la = buf;
787         struct thandle *th;
788         struct lu_buf lbuf = {
789                 .lb_buf = header,
790                 .lb_len = sizeof(*header)
791         };
792         loff_t size = sizeof(*header) + (keysize + recsize) * count;
793         loff_t pos = 0;
794         int rc;
795         bool punch = false;
796         ENTRY;
797
798         LASSERT(sizeof(*la) <= bufsize);
799         LASSERT(sizeof(*header) <= bufsize);
800
801         rc = dt_attr_get(env, obj, la);
802         if (rc)
803                 RETURN(rc);
804
805         if (la->la_size > size)
806                 punch = true;
807
808         lustre_index_backup_make_header(header, keysize, recsize,
809                                         tgt_fid, count);
810         th = dt_trans_create(env, dev);
811         if (IS_ERR(th))
812                 RETURN(PTR_ERR(th));
813
814         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
815         if (rc)
816                 GOTO(stop, rc);
817
818         if (punch) {
819                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
820                 if (rc)
821                         GOTO(stop, rc);
822         }
823
824         rc = dt_trans_start_local(env, dev, th);
825         if (rc)
826                 GOTO(stop, rc);
827
828         rc = dt_record_write(env, obj, &lbuf, &pos, th);
829         if (!rc && punch)
830                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
831
832         GOTO(stop, rc);
833
834 stop:
835         dt_trans_stop(env, dev, th);
836         return rc;
837 }
838
839 static int lustre_index_update_lma(const struct lu_env *env,
840                                    struct dt_object *obj,
841                                    void *buf, int bufsize)
842 {
843         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
844         struct lustre_mdt_attrs *lma = buf;
845         struct lu_buf lbuf = {
846                 .lb_buf = lma,
847                 .lb_len = sizeof(struct lustre_ost_attrs)
848         };
849         struct thandle *th;
850         int fl = LU_XATTR_REPLACE;
851         int rc;
852         ENTRY;
853
854         LASSERT(bufsize >= lbuf.lb_len);
855
856         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
857         if (unlikely(rc == -ENODATA)) {
858                 fl = LU_XATTR_CREATE;
859                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
860                                 LMAC_IDX_BACKUP, 0);
861                 rc = sizeof(*lma);
862         } else if (rc < sizeof(*lma)) {
863                 RETURN(rc < 0 ? rc : -EFAULT);
864         } else {
865                 lustre_lma_swab(lma);
866                 if (lma->lma_compat & LMAC_IDX_BACKUP)
867                         RETURN(0);
868
869                 lma->lma_compat |= LMAC_IDX_BACKUP;
870         }
871
872         lustre_lma_swab(lma);
873         lbuf.lb_len = rc;
874         th = dt_trans_create(env, dev);
875         if (IS_ERR(th))
876                 RETURN(rc);
877
878         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
879         if (rc)
880                 GOTO(stop, rc);
881
882         rc = dt_trans_start_local(env, dev, th);
883         if (rc)
884                 GOTO(stop, rc);
885
886         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
887
888         GOTO(stop, rc);
889
890 stop:
891         dt_trans_stop(env, dev, th);
892         return rc;
893 }
894
895 static int lustre_index_backup_one(const struct lu_env *env,
896                                    struct local_oid_storage *los,
897                                    struct dt_object *parent,
898                                    struct lustre_index_backup_unit *libu,
899                                    char *buf, int bufsize)
900 {
901         struct dt_device *dev = scrub_obj2dev(parent);
902         struct dt_object *tgt_obj = NULL;
903         struct dt_object *bak_obj = NULL;
904         const struct dt_it_ops *iops;
905         struct dt_it *di;
906         loff_t pos = sizeof(struct lustre_index_backup_header);
907         int count = 0;
908         int size = 0;
909         int rc;
910         ENTRY;
911
912         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
913                                              &libu->libu_fid, NULL));
914         if (IS_ERR_OR_NULL(tgt_obj))
915                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
916
917         if (!dt_object_exists(tgt_obj))
918                 GOTO(out, rc = 0);
919
920         if (!tgt_obj->do_index_ops) {
921                 struct dt_index_features feat;
922
923                 feat.dif_flags = DT_IND_UPDATE;
924                 feat.dif_keysize_min = libu->libu_keysize;
925                 feat.dif_keysize_max = libu->libu_keysize;
926                 feat.dif_recsize_min = libu->libu_recsize;
927                 feat.dif_recsize_max = libu->libu_recsize;
928                 feat.dif_ptrsize = 4;
929                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
930                 if (rc)
931                         GOTO(out, rc);
932         }
933
934         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
935         bak_obj = local_file_find_or_create(env, los, parent, buf,
936                                             S_IFREG | S_IRUGO | S_IWUSR);
937         if (IS_ERR_OR_NULL(bak_obj))
938                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
939
940         iops = &tgt_obj->do_index_ops->dio_it;
941         di = iops->init(env, tgt_obj, 0);
942         if (IS_ERR(di))
943                 GOTO(out, rc = PTR_ERR(di));
944
945         rc = iops->load(env, di, 0);
946         if (!rc)
947                 rc = iops->next(env, di);
948         else if (rc > 0)
949                 rc = 0;
950
951         while (!rc) {
952                 void *key;
953                 void *rec;
954
955                 key = iops->key(env, di);
956                 memcpy(&buf[size], key, libu->libu_keysize);
957                 size += libu->libu_keysize;
958                 rec = &buf[size];
959                 rc = iops->rec(env, di, rec, 0);
960                 if (rc)
961                         GOTO(fini, rc);
962
963                 size += libu->libu_recsize;
964                 count++;
965                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
966                         rc = lustre_index_backup_body(env, bak_obj, &pos,
967                                                       buf, size);
968                         if (rc)
969                                 GOTO(fini, rc);
970
971                         size = 0;
972                 }
973
974                 rc = iops->next(env, di);
975         }
976
977         if (rc >= 0 && size > 0)
978                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
979
980         if (rc < 0)
981                 GOTO(fini, rc);
982
983         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
984                                         libu->libu_keysize, libu->libu_recsize,
985                                         buf, bufsize, count);
986         if (!rc)
987                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
988
989         if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
990                 LASSERT(bufsize >= 512);
991
992                 pos = 0;
993                 memset(buf, 0, 512);
994                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
995         }
996
997         GOTO(fini, rc);
998
999 fini:
1000         iops->fini(env, di);
1001 out:
1002         if (!IS_ERR_OR_NULL(tgt_obj))
1003                 dt_object_put_nocache(env, tgt_obj);
1004         if (!IS_ERR_OR_NULL(bak_obj))
1005                 dt_object_put_nocache(env, bak_obj);
1006         return rc;
1007 }
1008
1009 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
1010                          const char *devname, struct list_head *head,
1011                          spinlock_t *lock, int *guard, bool backup)
1012 {
1013         struct lustre_index_backup_unit *libu;
1014         struct local_oid_storage *los = NULL;
1015         struct dt_object *parent = NULL;
1016         char *buf = NULL;
1017         struct lu_fid fid;
1018         int rc;
1019         ENTRY;
1020
1021         if (dev->dd_rdonly || *guard)
1022                 RETURN_EXIT;
1023
1024         spin_lock(lock);
1025         *guard = 1;
1026         spin_unlock(lock);
1027
1028         if (list_empty(head))
1029                 RETURN_EXIT;
1030
1031         /* Handle kinds of failures during mount process. */
1032         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
1033                 backup = false;
1034
1035         if (backup) {
1036                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1037                 if (!buf) {
1038                         backup = false;
1039                         goto scan;
1040                 }
1041
1042                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
1043                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1044                                                     &fid, NULL));
1045                 if (IS_ERR_OR_NULL(parent)) {
1046                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
1047                                devname, parent ? PTR_ERR(parent) : -ENOENT);
1048                         backup = false;
1049                         goto scan;
1050                 }
1051
1052                 lu_local_name_obj_fid(&fid, 1);
1053                 rc = local_oid_storage_init(env, dev, &fid, &los);
1054                 if (rc) {
1055                         CERROR("%s: failed to init local storage: rc = %d\n",
1056                                devname, rc);
1057                         backup = false;
1058                 }
1059         }
1060
1061 scan:
1062         spin_lock(lock);
1063         while (!list_empty(head)) {
1064                 libu = list_entry(head->next,
1065                                   struct lustre_index_backup_unit, libu_link);
1066                 list_del_init(&libu->libu_link);
1067                 spin_unlock(lock);
1068
1069                 if (backup) {
1070                         rc = lustre_index_backup_one(env, los, parent, libu,
1071                                                      buf, INDEX_BACKUP_BUFSIZE);
1072                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
1073                                devname, PFID(&libu->libu_fid), rc);
1074                 }
1075
1076                 OBD_FREE_PTR(libu);
1077                 spin_lock(lock);
1078         }
1079         spin_unlock(lock);
1080
1081         if (los)
1082                 local_oid_storage_fini(env, los);
1083         if (parent)
1084                 dt_object_put_nocache(env, parent);
1085         if (buf)
1086                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1087
1088         EXIT;
1089 }
1090 EXPORT_SYMBOL(lustre_index_backup);
1091
1092 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
1093                          const struct lu_fid *parent_fid,
1094                          const struct lu_fid *tgt_fid,
1095                          const struct lu_fid *bak_fid, const char *name,
1096                          struct list_head *head, spinlock_t *lock,
1097                          char *buf, int bufsize)
1098 {
1099         struct dt_object *parent_obj = NULL;
1100         struct dt_object *tgt_obj = NULL;
1101         struct dt_object *bak_obj = NULL;
1102         struct lustre_index_backup_header *header;
1103         struct dt_index_features *feat;
1104         struct dt_object_format *dof;
1105         struct lu_attr *la;
1106         struct thandle *th;
1107         struct lu_object_conf conf;
1108         struct dt_insert_rec ent;
1109         struct lu_buf lbuf;
1110         struct lu_fid tfid;
1111         loff_t pos = 0;
1112         __u32 keysize;
1113         __u32 recsize;
1114         __u32 pairsize;
1115         int count;
1116         int rc;
1117         bool registered = false;
1118         ENTRY;
1119
1120         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1121                 sizeof(*feat) + sizeof(*header));
1122
1123         memset(buf, 0, bufsize);
1124         la = (struct lu_attr *)buf;
1125         dof = (void *)la + sizeof(*la);
1126         feat = (void *)dof + sizeof(*dof);
1127         header = (void *)feat + sizeof(*feat);
1128         lbuf.lb_buf = header;
1129         lbuf.lb_len = sizeof(*header);
1130
1131         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1132                                              tgt_fid, NULL));
1133         if (IS_ERR_OR_NULL(tgt_obj))
1134                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1135
1136         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1137                                              bak_fid, NULL));
1138         if (IS_ERR_OR_NULL(bak_obj))
1139                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1140
1141         if (!dt_object_exists(bak_obj))
1142                 GOTO(out, rc = -ENOENT);
1143
1144         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1145                                                 parent_fid, NULL));
1146         if (IS_ERR_OR_NULL(parent_obj))
1147                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1148
1149         LASSERT(dt_object_exists(parent_obj));
1150
1151         if (unlikely(!dt_try_as_dir(env, parent_obj)))
1152                 GOTO(out, rc = -ENOTDIR);
1153
1154         rc = dt_attr_get(env, tgt_obj, la);
1155         if (rc)
1156                 GOTO(out, rc);
1157
1158         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1159         if (rc)
1160                 GOTO(out, rc);
1161
1162         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1163                 GOTO(out, rc = -EINVAL);
1164
1165         fid_le_to_cpu(&tfid, &header->libh_owner);
1166         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1167                 GOTO(out, rc = -EINVAL);
1168
1169         keysize = le32_to_cpu(header->libh_keysize);
1170         recsize = le32_to_cpu(header->libh_recsize);
1171         pairsize = keysize + recsize;
1172
1173         memset(feat, 0, sizeof(*feat));
1174         feat->dif_flags = DT_IND_UPDATE;
1175         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1176         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1177         feat->dif_ptrsize = 4;
1178
1179         /* T1: remove old name entry and destroy old index. */
1180         th = dt_trans_create(env, dev);
1181         if (IS_ERR(th))
1182                 GOTO(out, rc = PTR_ERR(th));
1183
1184         rc = dt_declare_delete(env, parent_obj,
1185                                (const struct dt_key *)name, th);
1186         if (rc)
1187                 GOTO(stop, rc);
1188
1189         rc = dt_declare_ref_del(env, tgt_obj, th);
1190         if (rc)
1191                 GOTO(stop, rc);
1192
1193         rc = dt_declare_destroy(env, tgt_obj, th);
1194         if (rc)
1195                 GOTO(stop, rc);
1196
1197         rc = dt_trans_start_local(env, dev, th);
1198         if (rc)
1199                 GOTO(stop, rc);
1200
1201         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1202         if (rc)
1203                 GOTO(stop, rc);
1204
1205         dt_write_lock(env, tgt_obj, 0);
1206         rc = dt_ref_del(env, tgt_obj, th);
1207         if (rc == 0) {
1208                 if (S_ISDIR(tgt_obj->do_lu.lo_header->loh_attr))
1209                         dt_ref_del(env, tgt_obj, th);
1210                 rc = dt_destroy(env, tgt_obj, th);
1211         }
1212         dt_write_unlock(env, tgt_obj);
1213         dt_trans_stop(env, dev, th);
1214         if (rc)
1215                 GOTO(out, rc);
1216
1217         la->la_valid = LA_MODE | LA_UID | LA_GID;
1218         conf.loc_flags = LOC_F_NEW;
1219         dof->u.dof_idx.di_feat = feat;
1220         dof->dof_type = DFT_INDEX;
1221         ent.rec_type = S_IFREG;
1222         ent.rec_fid = tgt_fid;
1223
1224         /* Drop cache before re-create it. */
1225         dt_object_put_nocache(env, tgt_obj);
1226         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1227                                              tgt_fid, &conf));
1228         if (IS_ERR_OR_NULL(tgt_obj))
1229                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1230
1231         LASSERT(!dt_object_exists(tgt_obj));
1232
1233         /* T2: create new index and insert new name entry. */
1234         th = dt_trans_create(env, dev);
1235         if (IS_ERR(th))
1236                 GOTO(out, rc = PTR_ERR(th));
1237
1238         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1239         if (rc)
1240                 GOTO(stop, rc);
1241
1242         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1243                                (const struct dt_key *)name, th);
1244         if (rc)
1245                 GOTO(stop, rc);
1246
1247         rc = dt_trans_start_local(env, dev, th);
1248         if (rc)
1249                 GOTO(stop, rc);
1250
1251         dt_write_lock(env, tgt_obj, 0);
1252         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1253         dt_write_unlock(env, tgt_obj);
1254         if (rc)
1255                 GOTO(stop, rc);
1256
1257         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1258                        (const struct dt_key *)name, th);
1259         dt_trans_stop(env, dev, th);
1260         /* Some index name may has been inserted by OSD
1261          * automatically when create the index object. */
1262         if (unlikely(rc == -EEXIST))
1263                 rc = 0;
1264         if (rc)
1265                 GOTO(out, rc);
1266
1267         /* The new index will register via index_try. */
1268         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1269         if (rc)
1270                 GOTO(out, rc);
1271
1272         registered = true;
1273         count = le32_to_cpu(header->libh_count);
1274         while (!rc && count > 0) {
1275                 int size = pairsize * count;
1276                 int items = count;
1277                 int i;
1278
1279                 if (size > bufsize) {
1280                         items = bufsize / pairsize;
1281                         size = pairsize * items;
1282                 }
1283
1284                 lbuf.lb_buf = buf;
1285                 lbuf.lb_len = size;
1286                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1287                 for (i = 0; i < items && !rc; i++) {
1288                         void *key = &buf[i * pairsize];
1289                         void *rec = &buf[i * pairsize + keysize];
1290
1291                         /* Tn: restore the records. */
1292                         th = dt_trans_create(env, dev);
1293                         if (!th)
1294                                 GOTO(out, rc = -ENOMEM);
1295
1296                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1297                         if (rc)
1298                                 GOTO(stop, rc);
1299
1300                         rc = dt_trans_start_local(env, dev, th);
1301                         if (rc)
1302                                 GOTO(stop, rc);
1303
1304                         rc = dt_insert(env, tgt_obj, rec, key, th);
1305                         if (unlikely(rc == -EEXIST))
1306                                 rc = 0;
1307
1308                         dt_trans_stop(env, dev, th);
1309                 }
1310
1311                 count -= items;
1312         }
1313
1314         GOTO(out, rc);
1315
1316 stop:
1317         dt_trans_stop(env, dev, th);
1318         if (rc && registered)
1319                 /* Degister the index to avoid overwriting the backup. */
1320                 lustre_index_degister(head, lock, tgt_fid);
1321
1322 out:
1323         if (!IS_ERR_OR_NULL(tgt_obj))
1324                 dt_object_put_nocache(env, tgt_obj);
1325         if (!IS_ERR_OR_NULL(bak_obj))
1326                 dt_object_put_nocache(env, bak_obj);
1327         if (!IS_ERR_OR_NULL(parent_obj))
1328                 dt_object_put_nocache(env, parent_obj);
1329         return rc;
1330 }
1331 EXPORT_SYMBOL(lustre_index_restore);