Whamcloud - gitweb
LU-12780 osd: use native kthreads for scrub.
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of_safe(obj->do_lu.lo_dev, struct dt_device,
45                                  dd_lu_dev);
46 }
47
48 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 {
50         uuid_copy(&des->sf_uuid, &src->sf_uuid);
51         des->sf_flags   = le64_to_cpu(src->sf_flags);
52         des->sf_magic   = le32_to_cpu(src->sf_magic);
53         des->sf_status  = le16_to_cpu(src->sf_status);
54         des->sf_param   = le16_to_cpu(src->sf_param);
55         des->sf_time_last_complete      =
56                                 le64_to_cpu(src->sf_time_last_complete);
57         des->sf_time_latest_start       =
58                                 le64_to_cpu(src->sf_time_latest_start);
59         des->sf_time_last_checkpoint    =
60                                 le64_to_cpu(src->sf_time_last_checkpoint);
61         des->sf_pos_latest_start        =
62                                 le64_to_cpu(src->sf_pos_latest_start);
63         des->sf_pos_last_checkpoint     =
64                                 le64_to_cpu(src->sf_pos_last_checkpoint);
65         des->sf_pos_first_inconsistent  =
66                                 le64_to_cpu(src->sf_pos_first_inconsistent);
67         des->sf_items_checked           =
68                                 le64_to_cpu(src->sf_items_checked);
69         des->sf_items_updated           =
70                                 le64_to_cpu(src->sf_items_updated);
71         des->sf_items_failed            =
72                                 le64_to_cpu(src->sf_items_failed);
73         des->sf_items_updated_prior     =
74                                 le64_to_cpu(src->sf_items_updated_prior);
75         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
76         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
77         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
78         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
79         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
80 }
81
82 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 {
84         uuid_copy(&des->sf_uuid, &src->sf_uuid);
85         des->sf_flags   = cpu_to_le64(src->sf_flags);
86         des->sf_magic   = cpu_to_le32(src->sf_magic);
87         des->sf_status  = cpu_to_le16(src->sf_status);
88         des->sf_param   = cpu_to_le16(src->sf_param);
89         des->sf_time_last_complete      =
90                                 cpu_to_le64(src->sf_time_last_complete);
91         des->sf_time_latest_start       =
92                                 cpu_to_le64(src->sf_time_latest_start);
93         des->sf_time_last_checkpoint    =
94                                 cpu_to_le64(src->sf_time_last_checkpoint);
95         des->sf_pos_latest_start        =
96                                 cpu_to_le64(src->sf_pos_latest_start);
97         des->sf_pos_last_checkpoint     =
98                                 cpu_to_le64(src->sf_pos_last_checkpoint);
99         des->sf_pos_first_inconsistent  =
100                                 cpu_to_le64(src->sf_pos_first_inconsistent);
101         des->sf_items_checked           =
102                                 cpu_to_le64(src->sf_items_checked);
103         des->sf_items_updated           =
104                                 cpu_to_le64(src->sf_items_updated);
105         des->sf_items_failed            =
106                                 cpu_to_le64(src->sf_items_failed);
107         des->sf_items_updated_prior     =
108                                 cpu_to_le64(src->sf_items_updated_prior);
109         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
110         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
111         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
112         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
113         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
114 }
115
116 void scrub_file_init(struct lustre_scrub *scrub, uuid_t uuid)
117 {
118         struct scrub_file *sf = &scrub->os_file;
119
120         memset(sf, 0, sizeof(*sf));
121         uuid_copy(&sf->sf_uuid, &uuid);
122         sf->sf_magic = SCRUB_MAGIC_V1;
123         sf->sf_status = SS_INIT;
124 }
125 EXPORT_SYMBOL(scrub_file_init);
126
127 void scrub_file_reset(struct lustre_scrub *scrub, uuid_t uuid, u64 flags)
128 {
129         struct scrub_file *sf = &scrub->os_file;
130
131         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
132                "%#llx, add flags = %#llx\n",
133                scrub->os_name, sf->sf_flags, flags);
134
135         uuid_copy(&sf->sf_uuid, &uuid);
136         sf->sf_status = SS_INIT;
137         sf->sf_flags |= flags;
138         sf->sf_flags &= ~SF_AUTO;
139         sf->sf_run_time = 0;
140         sf->sf_time_latest_start = 0;
141         sf->sf_time_last_checkpoint = 0;
142         sf->sf_pos_latest_start = 0;
143         sf->sf_pos_last_checkpoint = 0;
144         sf->sf_pos_first_inconsistent = 0;
145         sf->sf_items_checked = 0;
146         sf->sf_items_updated = 0;
147         sf->sf_items_failed = 0;
148         sf->sf_items_noscrub = 0;
149         sf->sf_items_igif = 0;
150         if (!scrub->os_in_join)
151                 sf->sf_items_updated_prior = 0;
152 }
153 EXPORT_SYMBOL(scrub_file_reset);
154
155 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
156 {
157         struct scrub_file *sf = &scrub->os_file;
158         struct lu_buf buf = {
159                 .lb_buf = &scrub->os_file_disk,
160                 .lb_len = sizeof(scrub->os_file_disk)
161         };
162         loff_t pos = 0;
163         int rc;
164
165         rc = dt_read(env, scrub->os_obj, &buf, &pos);
166         /* failure */
167         if (rc < 0) {
168                 CERROR("%s: fail to load scrub file: rc = %d\n",
169                        scrub->os_name, rc);
170                 return rc;
171         }
172
173         /* empty */
174         if (!rc)
175                 return -ENOENT;
176
177         /* corrupted */
178         if (rc < buf.lb_len) {
179                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
180                        "expected = %d: rc = %d\n",
181                        scrub->os_name, (int)buf.lb_len, rc);
182                 return -EFAULT;
183         }
184
185         scrub_file_to_cpu(sf, &scrub->os_file_disk);
186         if (sf->sf_magic != SCRUB_MAGIC_V1) {
187                 CDEBUG(D_LFSCK, "%s: invalid scrub magic 0x%x != 0x%x\n",
188                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V1);
189                 return -EFAULT;
190         }
191
192         return 0;
193 }
194 EXPORT_SYMBOL(scrub_file_load);
195
196 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
197 {
198         struct scrub_file *sf = &scrub->os_file_disk;
199         struct dt_object *obj = scrub->os_obj;
200         struct dt_device *dev = scrub_obj2dev(obj);
201         struct lu_buf buf = {
202                 .lb_buf = sf,
203                 .lb_len = sizeof(*sf)
204         };
205         struct thandle *th;
206         loff_t pos = 0;
207         int rc;
208         ENTRY;
209
210         /* Skip store under rdonly mode. */
211         if (dev->dd_rdonly)
212                 RETURN(0);
213
214         scrub_file_to_le(sf, &scrub->os_file);
215         th = dt_trans_create(env, dev);
216         if (IS_ERR(th))
217                 GOTO(log, rc = PTR_ERR(th));
218
219         rc = dt_declare_record_write(env, obj, &buf, pos, th);
220         if (rc)
221                 GOTO(stop, rc);
222
223         rc = dt_trans_start_local(env, dev, th);
224         if (rc)
225                 GOTO(stop, rc);
226
227         rc = dt_record_write(env, obj, &buf, &pos, th);
228
229         GOTO(stop, rc);
230
231 stop:
232         dt_trans_stop(env, dev, th);
233
234 log:
235         if (rc)
236                 CERROR("%s: store scrub file: rc = %d\n",
237                        scrub->os_name, rc);
238         else
239                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
240                        scrub->os_name, rc);
241
242         scrub->os_time_last_checkpoint = ktime_get_seconds();
243         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
244                                          SCRUB_CHECKPOINT_INTERVAL;
245         return rc;
246 }
247 EXPORT_SYMBOL(scrub_file_store);
248
249 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
250 {
251         struct scrub_file *sf = &scrub->os_file;
252         time64_t now = ktime_get_seconds();
253         int rc;
254
255         if (likely(now < scrub->os_time_next_checkpoint ||
256                    scrub->os_new_checked == 0))
257                 return 0;
258
259         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
260                scrub->os_name, scrub->os_pos_current);
261
262         down_write(&scrub->os_rwsem);
263         sf->sf_items_checked += scrub->os_new_checked;
264         scrub->os_new_checked = 0;
265         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
266         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
267         sf->sf_run_time += now - scrub->os_time_last_checkpoint;
268         rc = scrub_file_store(env, scrub);
269         up_write(&scrub->os_rwsem);
270
271         return rc;
272 }
273 EXPORT_SYMBOL(scrub_checkpoint);
274
275 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
276                 void *data, __u32 flags)
277 {
278         struct task_struct *task;
279         int rc;
280         ENTRY;
281
282         if (scrub->os_task)
283                 RETURN(-EALREADY);
284
285         if (scrub->os_file.sf_status == SS_COMPLETED) {
286                 if (!(flags & SS_SET_FAILOUT))
287                         flags |= SS_CLEAR_FAILOUT;
288
289                 if (!(flags & SS_SET_DRYRUN))
290                         flags |= SS_CLEAR_DRYRUN;
291
292                 flags |= SS_RESET;
293         }
294
295         task = kthread_create(threadfn, data, "OI_scrub");
296         if (IS_ERR(task)) {
297                 rc = PTR_ERR(task);
298                 CERROR("%s: cannot start iteration thread: rc = %d\n",
299                        scrub->os_name, rc);
300                 RETURN(rc);
301         }
302         spin_lock(&scrub->os_lock);
303         if (scrub->os_task) {
304                 /* Lost a race */
305                 spin_unlock(&scrub->os_lock);
306                 kthread_stop(task);
307                 RETURN(-EALREADY);
308         }
309         scrub->os_start_flags = flags;
310         scrub->os_task = task;
311         wake_up_process(task);
312         spin_unlock(&scrub->os_lock);
313         wait_var_event(scrub, scrub->os_running || !scrub->os_task);
314
315         RETURN(0);
316 }
317 EXPORT_SYMBOL(scrub_start);
318
319 void scrub_stop(struct lustre_scrub *scrub)
320 {
321         struct task_struct *task;
322
323         spin_lock(&scrub->os_lock);
324         scrub->os_running = 0;
325         spin_unlock(&scrub->os_lock);
326         task = xchg(&scrub->os_task, NULL);
327         if (task)
328                 kthread_stop(task);
329 }
330 EXPORT_SYMBOL(scrub_stop);
331
332 const char *scrub_status_names[] = {
333         "init",
334         "scanning",
335         "completed",
336         "failed",
337         "stopped",
338         "paused",
339         "crashed",
340         NULL
341 };
342
343 const char *scrub_flags_names[] = {
344         "recreated",
345         "inconsistent",
346         "auto",
347         "upgrade",
348         NULL
349 };
350
351 const char *scrub_param_names[] = {
352         "failout",
353         "dryrun",
354         NULL
355 };
356
357 static void scrub_bits_dump(struct seq_file *m, int bits, const char *names[],
358                             const char *prefix)
359 {
360         int flag;
361         int i;
362
363         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
364
365         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
366                 if (flag & bits) {
367                         bits &= ~flag;
368                         seq_printf(m, "%s%c", names[i],
369                                    bits != 0 ? ',' : '\n');
370                 }
371         }
372 }
373
374 static void scrub_time_dump(struct seq_file *m, time64_t time,
375                             const char *prefix)
376 {
377         if (time != 0)
378                 seq_printf(m, "%s: %llu seconds\n", prefix,
379                            ktime_get_real_seconds() - time);
380         else
381                 seq_printf(m, "%s: N/A\n", prefix);
382 }
383
384 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
385 {
386         if (pos != 0)
387                 seq_printf(m, "%s: %llu\n", prefix, pos);
388         else
389                 seq_printf(m, "%s: N/A\n", prefix);
390 }
391
392 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
393 {
394         struct scrub_file *sf = &scrub->os_file;
395         u64 checked;
396         s64 speed;
397
398         down_read(&scrub->os_rwsem);
399         seq_printf(m, "name: OI_scrub\n"
400                    "magic: 0x%x\n"
401                    "oi_files: %d\n"
402                    "status: %s\n",
403                    sf->sf_magic, (int)sf->sf_oi_count,
404                    scrub_status_names[sf->sf_status]);
405
406         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
407
408         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
409
410         scrub_time_dump(m, sf->sf_time_last_complete,
411                         "time_since_last_completed");
412
413         scrub_time_dump(m, sf->sf_time_latest_start,
414                         "time_since_latest_start");
415
416         scrub_time_dump(m, sf->sf_time_last_checkpoint,
417                         "time_since_last_checkpoint");
418
419         scrub_pos_dump(m, sf->sf_pos_latest_start,
420                         "latest_start_position");
421
422         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
423                         "last_checkpoint_position");
424
425         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
426                         "first_failure_position");
427
428         checked = sf->sf_items_checked + scrub->os_new_checked;
429         seq_printf(m, "checked: %llu\n"
430                    "%s: %llu\n"
431                    "failed: %llu\n"
432                    "prior_%s: %llu\n"
433                    "noscrub: %llu\n"
434                    "igif: %llu\n"
435                    "success_count: %u\n",
436                    checked,
437                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
438                    sf->sf_items_updated, sf->sf_items_failed,
439                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
440                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
441                    sf->sf_items_igif, sf->sf_success_count);
442
443         speed = checked;
444         if (scrub->os_running) {
445                 s64 new_checked = scrub->os_new_checked;
446                 time64_t duration;
447                 time64_t rtime;
448
449                 /* Since the time resolution is in seconds for new system
450                  * or small devices it ismore likely that duration will be
451                  * zero which will lead to inaccurate results.
452                  */
453                 duration = ktime_get_seconds() -
454                            scrub->os_time_last_checkpoint;
455                 if (duration != 0)
456                         new_checked = div_s64(new_checked, duration);
457
458                 rtime = sf->sf_run_time + duration;
459                 if (rtime != 0)
460                         speed = div_s64(speed, rtime);
461
462                 seq_printf(m, "run_time: %lld seconds\n"
463                            "average_speed: %lld objects/sec\n"
464                            "real_time_speed: %lld objects/sec\n"
465                            "current_position: %llu\n"
466                            "scrub_in_prior: %s\n"
467                            "scrub_full_speed: %s\n"
468                            "partial_scan: %s\n",
469                            rtime, speed, new_checked,
470                            scrub->os_pos_current,
471                            scrub->os_in_prior ? "yes" : "no",
472                            scrub->os_full_speed ? "yes" : "no",
473                            scrub->os_partial_scan ? "yes" : "no");
474         } else {
475                 if (sf->sf_run_time != 0)
476                         speed = div_s64(speed, sf->sf_run_time);
477                 seq_printf(m, "run_time: %d seconds\n"
478                            "average_speed: %lld objects/sec\n"
479                            "real_time_speed: N/A\n"
480                            "current_position: N/A\n",
481                            sf->sf_run_time, speed);
482         }
483
484         up_read(&scrub->os_rwsem);
485 }
486 EXPORT_SYMBOL(scrub_dump);
487
488 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
489                     const struct lu_fid *cfid, __u64 child,
490                     const char *name, int namelen)
491 {
492         struct lustre_index_restore_unit *liru;
493         int len = sizeof(*liru) + namelen + 1;
494
495         OBD_ALLOC(liru, len);
496         if (!liru)
497                 return -ENOMEM;
498
499         INIT_LIST_HEAD(&liru->liru_link);
500         liru->liru_pfid = *pfid;
501         liru->liru_cfid = *cfid;
502         liru->liru_clid = child;
503         liru->liru_len = len;
504         memcpy(liru->liru_name, name, namelen);
505         liru->liru_name[namelen] = 0;
506         list_add_tail(&liru->liru_link, head);
507
508         return 0;
509 }
510 EXPORT_SYMBOL(lustre_liru_new);
511
512 int lustre_index_register(struct dt_device *dev, const char *devname,
513                           struct list_head *head, spinlock_t *lock, int *guard,
514                           const struct lu_fid *fid,
515                           __u32 keysize, __u32 recsize)
516 {
517         struct lustre_index_backup_unit *libu, *pos;
518         int rc = 0;
519         ENTRY;
520
521         if (dev->dd_rdonly || *guard)
522                 RETURN(1);
523
524         OBD_ALLOC_PTR(libu);
525         if (!libu)
526                 RETURN(-ENOMEM);
527
528         INIT_LIST_HEAD(&libu->libu_link);
529         libu->libu_keysize = keysize;
530         libu->libu_recsize = recsize;
531         libu->libu_fid = *fid;
532
533         spin_lock(lock);
534         if (unlikely(*guard)) {
535                 spin_unlock(lock);
536                 OBD_FREE_PTR(libu);
537
538                 RETURN(1);
539         }
540
541         list_for_each_entry_reverse(pos, head, libu_link) {
542                 rc = lu_fid_cmp(&pos->libu_fid, fid);
543                 if (rc < 0) {
544                         list_add(&libu->libu_link, &pos->libu_link);
545                         spin_unlock(lock);
546
547                         RETURN(0);
548                 }
549
550                 if (!rc) {
551                         /* Registered already. But the former registered one
552                          * has different keysize/recsize. It may because that
553                          * the former values are from disk and corrupted, then
554                          * replace it with new values. */
555                         if (unlikely(keysize != pos->libu_keysize ||
556                                      recsize != pos->libu_recsize)) {
557                                 CWARN("%s: the index "DFID" has registered "
558                                       "with %u/%u, may be invalid, replace "
559                                       "with %u/%u\n",
560                                       devname, PFID(fid), pos->libu_keysize,
561                                       pos->libu_recsize, keysize, recsize);
562
563                                 pos->libu_keysize = keysize;
564                                 pos->libu_recsize = recsize;
565                         } else {
566                                 rc = 1;
567                         }
568
569                         spin_unlock(lock);
570                         OBD_FREE_PTR(libu);
571
572                         RETURN(rc);
573                 }
574         }
575
576         list_add(&libu->libu_link, head);
577         spin_unlock(lock);
578
579         RETURN(0);
580 }
581 EXPORT_SYMBOL(lustre_index_register);
582
583 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
584                                   const struct lu_fid *fid)
585 {
586         struct lustre_index_backup_unit *libu;
587         int rc = -ENOENT;
588
589         spin_lock(lock);
590         list_for_each_entry_reverse(libu, head, libu_link) {
591                 rc = lu_fid_cmp(&libu->libu_fid, fid);
592                 /* NOT registered. */
593                 if (rc < 0)
594                         break;
595
596                 if (!rc) {
597                         list_del(&libu->libu_link);
598                         break;
599                 }
600         }
601         spin_unlock(lock);
602
603         if (!rc)
604                 OBD_FREE_PTR(libu);
605 }
606
607 static void
608 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
609                                 __u32 keysize, __u32 recsize,
610                                 const struct lu_fid *fid, __u32 count)
611 {
612         memset(header, 0, sizeof(*header));
613         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
614         header->libh_count = cpu_to_le32(count);
615         header->libh_keysize = cpu_to_le32(keysize);
616         header->libh_recsize = cpu_to_le32(recsize);
617         fid_cpu_to_le(&header->libh_owner, fid);
618 }
619
620 static int lustre_index_backup_body(const struct lu_env *env,
621                                     struct dt_object *obj, loff_t *pos,
622                                     void *buf, int bufsize)
623 {
624         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
625         struct thandle *th;
626         struct lu_buf lbuf = {
627                 .lb_buf = buf,
628                 .lb_len = bufsize
629         };
630         int rc;
631         ENTRY;
632
633         th = dt_trans_create(env, dev);
634         if (IS_ERR(th))
635                 RETURN(PTR_ERR(th));
636
637         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
638         if (rc)
639                 GOTO(stop, rc);
640
641         rc = dt_trans_start_local(env, dev, th);
642         if (rc)
643                 GOTO(stop, rc);
644
645         rc = dt_record_write(env, obj, &lbuf, pos, th);
646
647         GOTO(stop, rc);
648
649 stop:
650         dt_trans_stop(env, dev, th);
651         return rc;
652 }
653
654 static int lustre_index_backup_header(const struct lu_env *env,
655                                       struct dt_object *obj,
656                                       const struct lu_fid *tgt_fid,
657                                       __u32 keysize, __u32 recsize,
658                                       void *buf, int bufsize, int count)
659 {
660         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
661         struct lustre_index_backup_header *header = buf;
662         struct lu_attr *la = buf;
663         struct thandle *th;
664         struct lu_buf lbuf = {
665                 .lb_buf = header,
666                 .lb_len = sizeof(*header)
667         };
668         loff_t size = sizeof(*header) + (keysize + recsize) * count;
669         loff_t pos = 0;
670         int rc;
671         bool punch = false;
672         ENTRY;
673
674         LASSERT(sizeof(*la) <= bufsize);
675         LASSERT(sizeof(*header) <= bufsize);
676
677         rc = dt_attr_get(env, obj, la);
678         if (rc)
679                 RETURN(rc);
680
681         if (la->la_size > size)
682                 punch = true;
683
684         lustre_index_backup_make_header(header, keysize, recsize,
685                                         tgt_fid, count);
686         th = dt_trans_create(env, dev);
687         if (IS_ERR(th))
688                 RETURN(PTR_ERR(th));
689
690         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
691         if (rc)
692                 GOTO(stop, rc);
693
694         if (punch) {
695                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
696                 if (rc)
697                         GOTO(stop, rc);
698         }
699
700         rc = dt_trans_start_local(env, dev, th);
701         if (rc)
702                 GOTO(stop, rc);
703
704         rc = dt_record_write(env, obj, &lbuf, &pos, th);
705         if (!rc && punch)
706                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
707
708         GOTO(stop, rc);
709
710 stop:
711         dt_trans_stop(env, dev, th);
712         return rc;
713 }
714
715 static int lustre_index_update_lma(const struct lu_env *env,
716                                    struct dt_object *obj,
717                                    void *buf, int bufsize)
718 {
719         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
720         struct lustre_mdt_attrs *lma = buf;
721         struct lu_buf lbuf = {
722                 .lb_buf = lma,
723                 .lb_len = sizeof(struct lustre_ost_attrs)
724         };
725         struct thandle *th;
726         int fl = LU_XATTR_REPLACE;
727         int rc;
728         ENTRY;
729
730         LASSERT(bufsize >= lbuf.lb_len);
731
732         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
733         if (unlikely(rc == -ENODATA)) {
734                 fl = LU_XATTR_CREATE;
735                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
736                                 LMAC_IDX_BACKUP, 0);
737                 rc = sizeof(*lma);
738         } else if (rc < sizeof(*lma)) {
739                 RETURN(rc < 0 ? rc : -EFAULT);
740         } else {
741                 lustre_lma_swab(lma);
742                 if (lma->lma_compat & LMAC_IDX_BACKUP)
743                         RETURN(0);
744
745                 lma->lma_compat |= LMAC_IDX_BACKUP;
746         }
747
748         lustre_lma_swab(lma);
749         lbuf.lb_len = rc;
750         th = dt_trans_create(env, dev);
751         if (IS_ERR(th))
752                 RETURN(rc);
753
754         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
755         if (rc)
756                 GOTO(stop, rc);
757
758         rc = dt_trans_start_local(env, dev, th);
759         if (rc)
760                 GOTO(stop, rc);
761
762         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
763
764         GOTO(stop, rc);
765
766 stop:
767         dt_trans_stop(env, dev, th);
768         return rc;
769 }
770
771 static int lustre_index_backup_one(const struct lu_env *env,
772                                    struct local_oid_storage *los,
773                                    struct dt_object *parent,
774                                    struct lustre_index_backup_unit *libu,
775                                    char *buf, int bufsize)
776 {
777         struct dt_device *dev = scrub_obj2dev(parent);
778         struct dt_object *tgt_obj = NULL;
779         struct dt_object *bak_obj = NULL;
780         const struct dt_it_ops *iops;
781         struct dt_it *di;
782         loff_t pos = sizeof(struct lustre_index_backup_header);
783         int count = 0;
784         int size = 0;
785         int rc;
786         ENTRY;
787
788         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
789                                              &libu->libu_fid, NULL));
790         if (IS_ERR_OR_NULL(tgt_obj))
791                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
792
793         if (!dt_object_exists(tgt_obj))
794                 GOTO(out, rc = 0);
795
796         if (!tgt_obj->do_index_ops) {
797                 struct dt_index_features feat;
798
799                 feat.dif_flags = DT_IND_UPDATE;
800                 feat.dif_keysize_min = libu->libu_keysize;
801                 feat.dif_keysize_max = libu->libu_keysize;
802                 feat.dif_recsize_min = libu->libu_recsize;
803                 feat.dif_recsize_max = libu->libu_recsize;
804                 feat.dif_ptrsize = 4;
805                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
806                 if (rc)
807                         GOTO(out, rc);
808         }
809
810         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
811         bak_obj = local_file_find_or_create(env, los, parent, buf,
812                                             S_IFREG | S_IRUGO | S_IWUSR);
813         if (IS_ERR_OR_NULL(bak_obj))
814                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
815
816         iops = &tgt_obj->do_index_ops->dio_it;
817         di = iops->init(env, tgt_obj, 0);
818         if (IS_ERR(di))
819                 GOTO(out, rc = PTR_ERR(di));
820
821         rc = iops->load(env, di, 0);
822         if (!rc)
823                 rc = iops->next(env, di);
824         else if (rc > 0)
825                 rc = 0;
826
827         while (!rc) {
828                 void *key;
829                 void *rec;
830
831                 key = iops->key(env, di);
832                 memcpy(&buf[size], key, libu->libu_keysize);
833                 size += libu->libu_keysize;
834                 rec = &buf[size];
835                 rc = iops->rec(env, di, rec, 0);
836                 if (rc)
837                         GOTO(fini, rc);
838
839                 size += libu->libu_recsize;
840                 count++;
841                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
842                         rc = lustre_index_backup_body(env, bak_obj, &pos,
843                                                       buf, size);
844                         if (rc)
845                                 GOTO(fini, rc);
846
847                         size = 0;
848                 }
849
850                 rc = iops->next(env, di);
851         }
852
853         if (rc >= 0 && size > 0)
854                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
855
856         if (rc < 0)
857                 GOTO(fini, rc);
858
859         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
860                                         libu->libu_keysize, libu->libu_recsize,
861                                         buf, bufsize, count);
862         if (!rc)
863                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
864
865         if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
866                 LASSERT(bufsize >= 512);
867
868                 pos = 0;
869                 memset(buf, 0, 512);
870                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
871         }
872
873         GOTO(fini, rc);
874
875 fini:
876         iops->fini(env, di);
877 out:
878         if (!IS_ERR_OR_NULL(tgt_obj))
879                 dt_object_put_nocache(env, tgt_obj);
880         if (!IS_ERR_OR_NULL(bak_obj))
881                 dt_object_put_nocache(env, bak_obj);
882         return rc;
883 }
884
885 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
886                          const char *devname, struct list_head *head,
887                          spinlock_t *lock, int *guard, bool backup)
888 {
889         struct lustre_index_backup_unit *libu;
890         struct local_oid_storage *los = NULL;
891         struct dt_object *parent = NULL;
892         char *buf = NULL;
893         struct lu_fid fid;
894         int rc;
895         ENTRY;
896
897         if (dev->dd_rdonly || *guard)
898                 RETURN_EXIT;
899
900         spin_lock(lock);
901         *guard = 1;
902         spin_unlock(lock);
903
904         if (list_empty(head))
905                 RETURN_EXIT;
906
907         /* Handle kinds of failures during mount process. */
908         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
909                 backup = false;
910
911         if (backup) {
912                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
913                 if (!buf) {
914                         backup = false;
915                         goto scan;
916                 }
917
918                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
919                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
920                                                     &fid, NULL));
921                 if (IS_ERR_OR_NULL(parent)) {
922                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
923                                devname, parent ? PTR_ERR(parent) : -ENOENT);
924                         backup = false;
925                         goto scan;
926                 }
927
928                 lu_local_name_obj_fid(&fid, 1);
929                 rc = local_oid_storage_init(env, dev, &fid, &los);
930                 if (rc) {
931                         CERROR("%s: failed to init local storage: rc = %d\n",
932                                devname, rc);
933                         backup = false;
934                 }
935         }
936
937 scan:
938         spin_lock(lock);
939         while (!list_empty(head)) {
940                 libu = list_entry(head->next,
941                                   struct lustre_index_backup_unit, libu_link);
942                 list_del_init(&libu->libu_link);
943                 spin_unlock(lock);
944
945                 if (backup) {
946                         rc = lustre_index_backup_one(env, los, parent, libu,
947                                                      buf, INDEX_BACKUP_BUFSIZE);
948                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
949                                devname, PFID(&libu->libu_fid), rc);
950                 }
951
952                 OBD_FREE_PTR(libu);
953                 spin_lock(lock);
954         }
955         spin_unlock(lock);
956
957         if (los)
958                 local_oid_storage_fini(env, los);
959         if (parent)
960                 dt_object_put_nocache(env, parent);
961         if (buf)
962                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
963
964         EXIT;
965 }
966 EXPORT_SYMBOL(lustre_index_backup);
967
968 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
969                          const struct lu_fid *parent_fid,
970                          const struct lu_fid *tgt_fid,
971                          const struct lu_fid *bak_fid, const char *name,
972                          struct list_head *head, spinlock_t *lock,
973                          char *buf, int bufsize)
974 {
975         struct dt_object *parent_obj = NULL;
976         struct dt_object *tgt_obj = NULL;
977         struct dt_object *bak_obj = NULL;
978         struct lustre_index_backup_header *header;
979         struct dt_index_features *feat;
980         struct dt_object_format *dof;
981         struct lu_attr *la;
982         struct thandle *th;
983         struct lu_object_conf conf;
984         struct dt_insert_rec ent;
985         struct lu_buf lbuf;
986         struct lu_fid tfid;
987         loff_t pos = 0;
988         __u32 keysize;
989         __u32 recsize;
990         __u32 pairsize;
991         int count;
992         int rc;
993         bool registered = false;
994         ENTRY;
995
996         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
997                 sizeof(*feat) + sizeof(*header));
998
999         memset(buf, 0, bufsize);
1000         la = (struct lu_attr *)buf;
1001         dof = (void *)la + sizeof(*la);
1002         feat = (void *)dof + sizeof(*dof);
1003         header = (void *)feat + sizeof(*feat);
1004         lbuf.lb_buf = header;
1005         lbuf.lb_len = sizeof(*header);
1006
1007         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1008                                              tgt_fid, NULL));
1009         if (IS_ERR_OR_NULL(tgt_obj))
1010                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1011
1012         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1013                                              bak_fid, NULL));
1014         if (IS_ERR_OR_NULL(bak_obj))
1015                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1016
1017         if (!dt_object_exists(bak_obj))
1018                 GOTO(out, rc = -ENOENT);
1019
1020         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1021                                                 parent_fid, NULL));
1022         if (IS_ERR_OR_NULL(parent_obj))
1023                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1024
1025         LASSERT(dt_object_exists(parent_obj));
1026
1027         if (unlikely(!dt_try_as_dir(env, parent_obj)))
1028                 GOTO(out, rc = -ENOTDIR);
1029
1030         rc = dt_attr_get(env, tgt_obj, la);
1031         if (rc)
1032                 GOTO(out, rc);
1033
1034         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1035         if (rc)
1036                 GOTO(out, rc);
1037
1038         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1039                 GOTO(out, rc = -EINVAL);
1040
1041         fid_le_to_cpu(&tfid, &header->libh_owner);
1042         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1043                 GOTO(out, rc = -EINVAL);
1044
1045         keysize = le32_to_cpu(header->libh_keysize);
1046         recsize = le32_to_cpu(header->libh_recsize);
1047         pairsize = keysize + recsize;
1048
1049         memset(feat, 0, sizeof(*feat));
1050         feat->dif_flags = DT_IND_UPDATE;
1051         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1052         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1053         feat->dif_ptrsize = 4;
1054
1055         /* T1: remove old name entry and destroy old index. */
1056         th = dt_trans_create(env, dev);
1057         if (IS_ERR(th))
1058                 GOTO(out, rc = PTR_ERR(th));
1059
1060         rc = dt_declare_delete(env, parent_obj,
1061                                (const struct dt_key *)name, th);
1062         if (rc)
1063                 GOTO(stop, rc);
1064
1065         rc = dt_declare_destroy(env, tgt_obj, th);
1066         if (rc)
1067                 GOTO(stop, rc);
1068
1069         rc = dt_trans_start_local(env, dev, th);
1070         if (rc)
1071                 GOTO(stop, rc);
1072
1073         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1074         if (rc)
1075                 GOTO(stop, rc);
1076
1077         dt_write_lock(env, tgt_obj, 0);
1078         rc = dt_destroy(env, tgt_obj, th);
1079         dt_write_unlock(env, tgt_obj);
1080         dt_trans_stop(env, dev, th);
1081         if (rc)
1082                 GOTO(out, rc);
1083
1084         la->la_valid = LA_MODE | LA_UID | LA_GID;
1085         conf.loc_flags = LOC_F_NEW;
1086         dof->u.dof_idx.di_feat = feat;
1087         dof->dof_type = DFT_INDEX;
1088         ent.rec_type = S_IFREG;
1089         ent.rec_fid = tgt_fid;
1090
1091         /* Drop cache before re-create it. */
1092         dt_object_put_nocache(env, tgt_obj);
1093         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1094                                              tgt_fid, &conf));
1095         if (IS_ERR_OR_NULL(tgt_obj))
1096                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1097
1098         LASSERT(!dt_object_exists(tgt_obj));
1099
1100         /* T2: create new index and insert new name entry. */
1101         th = dt_trans_create(env, dev);
1102         if (IS_ERR(th))
1103                 GOTO(out, rc = PTR_ERR(th));
1104
1105         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1106         if (rc)
1107                 GOTO(stop, rc);
1108
1109         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1110                                (const struct dt_key *)name, th);
1111         if (rc)
1112                 GOTO(stop, rc);
1113
1114         rc = dt_trans_start_local(env, dev, th);
1115         if (rc)
1116                 GOTO(stop, rc);
1117
1118         dt_write_lock(env, tgt_obj, 0);
1119         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1120         dt_write_unlock(env, tgt_obj);
1121         if (rc)
1122                 GOTO(stop, rc);
1123
1124         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1125                        (const struct dt_key *)name, th);
1126         dt_trans_stop(env, dev, th);
1127         /* Some index name may has been inserted by OSD
1128          * automatically when create the index object. */
1129         if (unlikely(rc == -EEXIST))
1130                 rc = 0;
1131         if (rc)
1132                 GOTO(out, rc);
1133
1134         /* The new index will register via index_try. */
1135         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1136         if (rc)
1137                 GOTO(out, rc);
1138
1139         registered = true;
1140         count = le32_to_cpu(header->libh_count);
1141         while (!rc && count > 0) {
1142                 int size = pairsize * count;
1143                 int items = count;
1144                 int i;
1145
1146                 if (size > bufsize) {
1147                         items = bufsize / pairsize;
1148                         size = pairsize * items;
1149                 }
1150
1151                 lbuf.lb_buf = buf;
1152                 lbuf.lb_len = size;
1153                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1154                 for (i = 0; i < items && !rc; i++) {
1155                         void *key = &buf[i * pairsize];
1156                         void *rec = &buf[i * pairsize + keysize];
1157
1158                         /* Tn: restore the records. */
1159                         th = dt_trans_create(env, dev);
1160                         if (!th)
1161                                 GOTO(out, rc = -ENOMEM);
1162
1163                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1164                         if (rc)
1165                                 GOTO(stop, rc);
1166
1167                         rc = dt_trans_start_local(env, dev, th);
1168                         if (rc)
1169                                 GOTO(stop, rc);
1170
1171                         rc = dt_insert(env, tgt_obj, rec, key, th);
1172                         if (unlikely(rc == -EEXIST))
1173                                 rc = 0;
1174
1175                         dt_trans_stop(env, dev, th);
1176                 }
1177
1178                 count -= items;
1179         }
1180
1181         GOTO(out, rc);
1182
1183 stop:
1184         dt_trans_stop(env, dev, th);
1185         if (rc && registered)
1186                 /* Degister the index to avoid overwriting the backup. */
1187                 lustre_index_degister(head, lock, tgt_fid);
1188
1189 out:
1190         if (!IS_ERR_OR_NULL(tgt_obj))
1191                 dt_object_put_nocache(env, tgt_obj);
1192         if (!IS_ERR_OR_NULL(bak_obj))
1193                 dt_object_put_nocache(env, bak_obj);
1194         if (!IS_ERR_OR_NULL(parent_obj))
1195                 dt_object_put_nocache(env, parent_obj);
1196         return rc;
1197 }
1198 EXPORT_SYMBOL(lustre_index_restore);