Whamcloud - gitweb
3ab85407a5c6620fc6dd1f8327c2ccdc7bb0d07d
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
45 }
46
47 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
48 {
49         memcpy(des->sf_uuid, src->sf_uuid, 16);
50         des->sf_flags   = le64_to_cpu(src->sf_flags);
51         des->sf_magic   = le32_to_cpu(src->sf_magic);
52         des->sf_status  = le16_to_cpu(src->sf_status);
53         des->sf_param   = le16_to_cpu(src->sf_param);
54         des->sf_time_last_complete      =
55                                 le64_to_cpu(src->sf_time_last_complete);
56         des->sf_time_latest_start       =
57                                 le64_to_cpu(src->sf_time_latest_start);
58         des->sf_time_last_checkpoint    =
59                                 le64_to_cpu(src->sf_time_last_checkpoint);
60         des->sf_pos_latest_start        =
61                                 le64_to_cpu(src->sf_pos_latest_start);
62         des->sf_pos_last_checkpoint     =
63                                 le64_to_cpu(src->sf_pos_last_checkpoint);
64         des->sf_pos_first_inconsistent  =
65                                 le64_to_cpu(src->sf_pos_first_inconsistent);
66         des->sf_items_checked           =
67                                 le64_to_cpu(src->sf_items_checked);
68         des->sf_items_updated           =
69                                 le64_to_cpu(src->sf_items_updated);
70         des->sf_items_failed            =
71                                 le64_to_cpu(src->sf_items_failed);
72         des->sf_items_updated_prior     =
73                                 le64_to_cpu(src->sf_items_updated_prior);
74         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
75         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
76         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
77         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
78         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
79 }
80
81 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
82 {
83         memcpy(des->sf_uuid, src->sf_uuid, 16);
84         des->sf_flags   = cpu_to_le64(src->sf_flags);
85         des->sf_magic   = cpu_to_le32(src->sf_magic);
86         des->sf_status  = cpu_to_le16(src->sf_status);
87         des->sf_param   = cpu_to_le16(src->sf_param);
88         des->sf_time_last_complete      =
89                                 cpu_to_le64(src->sf_time_last_complete);
90         des->sf_time_latest_start       =
91                                 cpu_to_le64(src->sf_time_latest_start);
92         des->sf_time_last_checkpoint    =
93                                 cpu_to_le64(src->sf_time_last_checkpoint);
94         des->sf_pos_latest_start        =
95                                 cpu_to_le64(src->sf_pos_latest_start);
96         des->sf_pos_last_checkpoint     =
97                                 cpu_to_le64(src->sf_pos_last_checkpoint);
98         des->sf_pos_first_inconsistent  =
99                                 cpu_to_le64(src->sf_pos_first_inconsistent);
100         des->sf_items_checked           =
101                                 cpu_to_le64(src->sf_items_checked);
102         des->sf_items_updated           =
103                                 cpu_to_le64(src->sf_items_updated);
104         des->sf_items_failed            =
105                                 cpu_to_le64(src->sf_items_failed);
106         des->sf_items_updated_prior     =
107                                 cpu_to_le64(src->sf_items_updated_prior);
108         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
109         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
110         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
111         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
112         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
113 }
114
115 void scrub_file_init(struct lustre_scrub *scrub, __u8 *uuid)
116 {
117         struct scrub_file *sf = &scrub->os_file;
118
119         memset(sf, 0, sizeof(*sf));
120         memcpy(sf->sf_uuid, uuid, 16);
121         sf->sf_magic = SCRUB_MAGIC_V1;
122         sf->sf_status = SS_INIT;
123 }
124 EXPORT_SYMBOL(scrub_file_init);
125
126 void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags)
127 {
128         struct scrub_file *sf = &scrub->os_file;
129
130         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
131                "%#llx, add flags = %#llx\n",
132                scrub->os_name, sf->sf_flags, flags);
133
134         memcpy(sf->sf_uuid, uuid, 16);
135         sf->sf_status = SS_INIT;
136         sf->sf_flags |= flags;
137         sf->sf_flags &= ~SF_AUTO;
138         sf->sf_run_time = 0;
139         sf->sf_time_latest_start = 0;
140         sf->sf_time_last_checkpoint = 0;
141         sf->sf_pos_latest_start = 0;
142         sf->sf_pos_last_checkpoint = 0;
143         sf->sf_pos_first_inconsistent = 0;
144         sf->sf_items_checked = 0;
145         sf->sf_items_updated = 0;
146         sf->sf_items_failed = 0;
147         sf->sf_items_noscrub = 0;
148         sf->sf_items_igif = 0;
149         if (!scrub->os_in_join)
150                 sf->sf_items_updated_prior = 0;
151 }
152 EXPORT_SYMBOL(scrub_file_reset);
153
154 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
155 {
156         struct scrub_file *sf = &scrub->os_file;
157         struct lu_buf buf = {
158                 .lb_buf = &scrub->os_file_disk,
159                 .lb_len = sizeof(scrub->os_file_disk)
160         };
161         loff_t pos = 0;
162         int rc;
163
164         rc = dt_read(env, scrub->os_obj, &buf, &pos);
165         /* failure */
166         if (rc < 0) {
167                 CERROR("%s: fail to load scrub file: rc = %d\n",
168                        scrub->os_name, rc);
169                 return rc;
170         }
171
172         /* empty */
173         if (!rc)
174                 return -ENOENT;
175
176         /* corrupted */
177         if (rc < buf.lb_len) {
178                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
179                        "expected = %d: rc = %d\n",
180                        scrub->os_name, (int)buf.lb_len, rc);
181                 return -EFAULT;
182         }
183
184         scrub_file_to_cpu(sf, &scrub->os_file_disk);
185         if (sf->sf_magic != SCRUB_MAGIC_V1) {
186                 CDEBUG(D_LFSCK, "%s: invalid scrub magic 0x%x != 0x%x\n",
187                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V1);
188                 return -EFAULT;
189         }
190
191         return 0;
192 }
193 EXPORT_SYMBOL(scrub_file_load);
194
195 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
196 {
197         struct scrub_file *sf = &scrub->os_file_disk;
198         struct dt_object *obj = scrub->os_obj;
199         struct dt_device *dev = scrub_obj2dev(obj);
200         struct lu_buf buf = {
201                 .lb_buf = sf,
202                 .lb_len = sizeof(*sf)
203         };
204         struct thandle *th;
205         loff_t pos = 0;
206         int rc;
207         ENTRY;
208
209         /* Skip store under rdonly mode. */
210         if (dev->dd_rdonly)
211                 RETURN(0);
212
213         scrub_file_to_le(sf, &scrub->os_file);
214         th = dt_trans_create(env, dev);
215         if (IS_ERR(th))
216                 GOTO(log, rc = PTR_ERR(th));
217
218         rc = dt_declare_record_write(env, obj, &buf, pos, th);
219         if (rc)
220                 GOTO(stop, rc);
221
222         rc = dt_trans_start_local(env, dev, th);
223         if (rc)
224                 GOTO(stop, rc);
225
226         rc = dt_record_write(env, obj, &buf, &pos, th);
227
228         GOTO(stop, rc);
229
230 stop:
231         dt_trans_stop(env, dev, th);
232
233 log:
234         if (rc)
235                 CERROR("%s: store scrub file: rc = %d\n",
236                        scrub->os_name, rc);
237         else
238                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
239                        scrub->os_name, rc);
240
241         scrub->os_time_last_checkpoint = cfs_time_current();
242         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
243                                 cfs_time_seconds(SCRUB_CHECKPOINT_INTERVAL);
244         return rc;
245 }
246 EXPORT_SYMBOL(scrub_file_store);
247
248 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
249 {
250         struct scrub_file *sf = &scrub->os_file;
251         int rc;
252
253         if (likely(cfs_time_before(cfs_time_current(),
254                                    scrub->os_time_next_checkpoint) ||
255                    scrub->os_new_checked == 0))
256                 return 0;
257
258         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
259                scrub->os_name, scrub->os_pos_current);
260
261         down_write(&scrub->os_rwsem);
262         sf->sf_items_checked += scrub->os_new_checked;
263         scrub->os_new_checked = 0;
264         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
265         sf->sf_time_last_checkpoint = cfs_time_current_sec();
266         sf->sf_run_time += cfs_duration_sec(cfs_time_current() + HALF_SEC -
267                                             scrub->os_time_last_checkpoint);
268         rc = scrub_file_store(env, scrub);
269         up_write(&scrub->os_rwsem);
270
271         return rc;
272 }
273 EXPORT_SYMBOL(scrub_checkpoint);
274
275 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
276                 void *data, __u32 flags)
277 {
278         struct ptlrpc_thread *thread = &scrub->os_thread;
279         struct l_wait_info lwi = { 0 };
280         struct task_struct *task;
281         int rc;
282         ENTRY;
283
284 again:
285         /* os_lock: sync status between stop and scrub thread */
286         spin_lock(&scrub->os_lock);
287         if (thread_is_running(thread)) {
288                 spin_unlock(&scrub->os_lock);
289                 RETURN(-EALREADY);
290         }
291
292         if (unlikely(thread_is_stopping(thread))) {
293                 spin_unlock(&scrub->os_lock);
294                 l_wait_event(thread->t_ctl_waitq,
295                              thread_is_stopped(thread),
296                              &lwi);
297                 goto again;
298         }
299         spin_unlock(&scrub->os_lock);
300
301         if (scrub->os_file.sf_status == SS_COMPLETED) {
302                 if (!(flags & SS_SET_FAILOUT))
303                         flags |= SS_CLEAR_FAILOUT;
304
305                 if (!(flags & SS_SET_DRYRUN))
306                         flags |= SS_CLEAR_DRYRUN;
307
308                 flags |= SS_RESET;
309         }
310
311         scrub->os_start_flags = flags;
312         thread_set_flags(thread, 0);
313         task = kthread_run(threadfn, data, "OI_scrub");
314         if (IS_ERR(task)) {
315                 rc = PTR_ERR(task);
316                 CERROR("%s: cannot start iteration thread: rc = %d\n",
317                        scrub->os_name, rc);
318                 RETURN(rc);
319         }
320
321         l_wait_event(thread->t_ctl_waitq,
322                      thread_is_running(thread) || thread_is_stopped(thread),
323                      &lwi);
324
325         RETURN(0);
326 }
327 EXPORT_SYMBOL(scrub_start);
328
329 void scrub_stop(struct lustre_scrub *scrub)
330 {
331         struct ptlrpc_thread *thread = &scrub->os_thread;
332         struct l_wait_info lwi = { 0 };
333
334         /* os_lock: sync status between stop and scrub thread */
335         spin_lock(&scrub->os_lock);
336         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
337                 thread_set_flags(thread, SVC_STOPPING);
338                 spin_unlock(&scrub->os_lock);
339                 wake_up_all(&thread->t_ctl_waitq);
340                 l_wait_event(thread->t_ctl_waitq,
341                              thread_is_stopped(thread),
342                              &lwi);
343                 /* Do not skip the last lock/unlock, which can guarantee that
344                  * the caller cannot return until the OI scrub thread exit. */
345                 spin_lock(&scrub->os_lock);
346         }
347         spin_unlock(&scrub->os_lock);
348 }
349 EXPORT_SYMBOL(scrub_stop);
350
351 const char *scrub_status_names[] = {
352         "init",
353         "scanning",
354         "completed",
355         "failed",
356         "stopped",
357         "paused",
358         "crashed",
359         NULL
360 };
361
362 const char *scrub_flags_names[] = {
363         "recreated",
364         "inconsistent",
365         "auto",
366         "upgrade",
367         NULL
368 };
369
370 const char *scrub_param_names[] = {
371         "failout",
372         "dryrun",
373         NULL
374 };
375
376 static void scrub_bits_dump(struct seq_file *m, int bits, const char *names[],
377                             const char *prefix)
378 {
379         int flag;
380         int i;
381
382         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
383
384         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
385                 if (flag & bits) {
386                         bits &= ~flag;
387                         seq_printf(m, "%s%c", names[i],
388                                    bits != 0 ? ',' : '\n');
389                 }
390         }
391 }
392
393 static void scrub_time_dump(struct seq_file *m, __u64 time, const char *prefix)
394 {
395         if (time != 0)
396                 seq_printf(m, "%s: %llu seconds\n", prefix,
397                            cfs_time_current_sec() - time);
398         else
399                 seq_printf(m, "%s: N/A\n", prefix);
400 }
401
402 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
403 {
404         if (pos != 0)
405                 seq_printf(m, "%s: %llu\n", prefix, pos);
406         else
407                 seq_printf(m, "%s: N/A\n", prefix);
408 }
409
410 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
411 {
412         struct scrub_file *sf = &scrub->os_file;
413         __u64 checked;
414         __u64 speed;
415
416         down_read(&scrub->os_rwsem);
417         seq_printf(m, "name: OI_scrub\n"
418                    "magic: 0x%x\n"
419                    "oi_files: %d\n"
420                    "status: %s\n",
421                    sf->sf_magic, (int)sf->sf_oi_count,
422                    scrub_status_names[sf->sf_status]);
423
424         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
425
426         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
427
428         scrub_time_dump(m, sf->sf_time_last_complete,
429                         "time_since_last_completed");
430
431         scrub_time_dump(m, sf->sf_time_latest_start,
432                         "time_since_latest_start");
433
434         scrub_time_dump(m, sf->sf_time_last_checkpoint,
435                         "time_since_last_checkpoint");
436
437         scrub_pos_dump(m, sf->sf_pos_latest_start,
438                         "latest_start_position");
439
440         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
441                         "last_checkpoint_position");
442
443         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
444                         "first_failure_position");
445
446         checked = sf->sf_items_checked + scrub->os_new_checked;
447         seq_printf(m, "checked: %llu\n"
448                    "%s: %llu\n"
449                    "failed: %llu\n"
450                    "prior_%s: %llu\n"
451                    "noscrub: %llu\n"
452                    "igif: %llu\n"
453                    "success_count: %u\n",
454                    checked,
455                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
456                    sf->sf_items_updated, sf->sf_items_failed,
457                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
458                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
459                    sf->sf_items_igif, sf->sf_success_count);
460
461         speed = checked;
462         if (thread_is_running(&scrub->os_thread)) {
463                 cfs_duration_t duration = cfs_time_current() -
464                                           scrub->os_time_last_checkpoint;
465                 __u64 new_checked = msecs_to_jiffies(scrub->os_new_checked *
466                                                      MSEC_PER_SEC);
467                 __u32 rtime = sf->sf_run_time +
468                               cfs_duration_sec(duration + HALF_SEC);
469
470                 if (duration != 0)
471                         do_div(new_checked, duration);
472                 if (rtime != 0)
473                         do_div(speed, rtime);
474                 seq_printf(m, "run_time: %u seconds\n"
475                            "average_speed: %llu objects/sec\n"
476                            "real-time_speed: %llu objects/sec\n"
477                            "current_position: %llu\n"
478                            "scrub_in_prior: %s\n"
479                            "scrub_full_speed: %s\n"
480                            "partial_scan: %s\n",
481                            rtime, speed, new_checked, scrub->os_pos_current,
482                            scrub->os_in_prior ? "yes" : "no",
483                            scrub->os_full_speed ? "yes" : "no",
484                            scrub->os_partial_scan ? "yes" : "no");
485         } else {
486                 if (sf->sf_run_time != 0)
487                         do_div(speed, sf->sf_run_time);
488                 seq_printf(m, "run_time: %u seconds\n"
489                            "average_speed: %llu objects/sec\n"
490                            "real-time_speed: N/A\n"
491                            "current_position: N/A\n",
492                            sf->sf_run_time, speed);
493         }
494
495         up_read(&scrub->os_rwsem);
496 }
497 EXPORT_SYMBOL(scrub_dump);
498
499 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
500                     const struct lu_fid *cfid, __u64 child,
501                     const char *name, int namelen)
502 {
503         struct lustre_index_restore_unit *liru;
504         int len = sizeof(*liru) + namelen + 1;
505
506         OBD_ALLOC(liru, len);
507         if (!liru)
508                 return -ENOMEM;
509
510         INIT_LIST_HEAD(&liru->liru_link);
511         liru->liru_pfid = *pfid;
512         liru->liru_cfid = *cfid;
513         liru->liru_clid = child;
514         liru->liru_len = len;
515         memcpy(liru->liru_name, name, namelen);
516         liru->liru_name[namelen] = 0;
517         list_add_tail(&liru->liru_link, head);
518
519         return 0;
520 }
521 EXPORT_SYMBOL(lustre_liru_new);
522
523 int lustre_index_register(struct dt_device *dev, const char *devname,
524                           struct list_head *head, spinlock_t *lock, int *guard,
525                           const struct lu_fid *fid,
526                           __u32 keysize, __u32 recsize)
527 {
528         struct lustre_index_backup_unit *libu, *pos;
529         int rc = 0;
530         ENTRY;
531
532         if (dev->dd_rdonly || *guard)
533                 RETURN(1);
534
535         OBD_ALLOC_PTR(libu);
536         if (!libu)
537                 RETURN(-ENOMEM);
538
539         INIT_LIST_HEAD(&libu->libu_link);
540         libu->libu_keysize = keysize;
541         libu->libu_recsize = recsize;
542         libu->libu_fid = *fid;
543
544         spin_lock(lock);
545         if (unlikely(*guard)) {
546                 spin_unlock(lock);
547                 OBD_FREE_PTR(libu);
548
549                 RETURN(1);
550         }
551
552         list_for_each_entry_reverse(pos, head, libu_link) {
553                 rc = lu_fid_cmp(&pos->libu_fid, fid);
554                 if (rc < 0) {
555                         list_add(&libu->libu_link, &pos->libu_link);
556                         spin_unlock(lock);
557
558                         RETURN(0);
559                 }
560
561                 if (!rc) {
562                         /* Registered already. But the former registered one
563                          * has different keysize/recsize. It may because that
564                          * the former values are from disk and corrupted, then
565                          * replace it with new values. */
566                         if (unlikely(keysize != pos->libu_keysize ||
567                                      recsize != pos->libu_recsize)) {
568                                 CWARN("%s: the index "DFID" has registered "
569                                       "with %u/%u, may be invalid, replace "
570                                       "with %u/%u\n",
571                                       devname, PFID(fid), pos->libu_keysize,
572                                       pos->libu_recsize, keysize, recsize);
573
574                                 pos->libu_keysize = keysize;
575                                 pos->libu_recsize = recsize;
576                         } else {
577                                 rc = 1;
578                         }
579
580                         spin_unlock(lock);
581                         OBD_FREE_PTR(libu);
582
583                         RETURN(rc);
584                 }
585         }
586
587         list_add(&libu->libu_link, head);
588         spin_unlock(lock);
589
590         RETURN(0);
591 }
592 EXPORT_SYMBOL(lustre_index_register);
593
594 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
595                                   const struct lu_fid *fid)
596 {
597         struct lustre_index_backup_unit *libu;
598         int rc = -ENOENT;
599
600         spin_lock(lock);
601         list_for_each_entry_reverse(libu, head, libu_link) {
602                 rc = lu_fid_cmp(&libu->libu_fid, fid);
603                 /* NOT registered. */
604                 if (rc < 0)
605                         break;
606
607                 if (!rc) {
608                         list_del(&libu->libu_link);
609                         break;
610                 }
611         }
612         spin_unlock(lock);
613
614         if (!rc)
615                 OBD_FREE_PTR(libu);
616 }
617
618 static void
619 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
620                                 __u32 keysize, __u32 recsize,
621                                 const struct lu_fid *fid, __u32 count)
622 {
623         memset(header, 0, sizeof(*header));
624         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
625         header->libh_count = cpu_to_le32(count);
626         header->libh_keysize = cpu_to_le32(keysize);
627         header->libh_recsize = cpu_to_le32(recsize);
628         fid_cpu_to_le(&header->libh_owner, fid);
629 }
630
631 static int lustre_index_backup_body(const struct lu_env *env,
632                                     struct dt_object *obj, loff_t *pos,
633                                     void *buf, int bufsize)
634 {
635         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
636         struct thandle *th;
637         struct lu_buf lbuf = {
638                 .lb_buf = buf,
639                 .lb_len = bufsize
640         };
641         int rc;
642         ENTRY;
643
644         th = dt_trans_create(env, dev);
645         if (IS_ERR(th))
646                 RETURN(PTR_ERR(th));
647
648         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
649         if (rc)
650                 GOTO(stop, rc);
651
652         rc = dt_trans_start_local(env, dev, th);
653         if (rc)
654                 GOTO(stop, rc);
655
656         rc = dt_record_write(env, obj, &lbuf, pos, th);
657
658         GOTO(stop, rc);
659
660 stop:
661         dt_trans_stop(env, dev, th);
662         return rc;
663 }
664
665 static int lustre_index_backup_header(const struct lu_env *env,
666                                       struct dt_object *obj,
667                                       const struct lu_fid *tgt_fid,
668                                       __u32 keysize, __u32 recsize,
669                                       void *buf, int bufsize, int count)
670 {
671         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
672         struct lustre_index_backup_header *header = buf;
673         struct lu_attr *la = buf;
674         struct thandle *th;
675         struct lu_buf lbuf = {
676                 .lb_buf = header,
677                 .lb_len = sizeof(*header)
678         };
679         loff_t size = sizeof(*header) + (keysize + recsize) * count;
680         loff_t pos = 0;
681         int rc;
682         bool punch = false;
683         ENTRY;
684
685         LASSERT(sizeof(*la) <= bufsize);
686         LASSERT(sizeof(*header) <= bufsize);
687
688         rc = dt_attr_get(env, obj, la);
689         if (rc)
690                 RETURN(rc);
691
692         if (la->la_size > size)
693                 punch = true;
694
695         lustre_index_backup_make_header(header, keysize, recsize,
696                                         tgt_fid, count);
697         th = dt_trans_create(env, dev);
698         if (IS_ERR(th))
699                 RETURN(PTR_ERR(th));
700
701         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
702         if (rc)
703                 GOTO(stop, rc);
704
705         if (punch) {
706                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
707                 if (rc)
708                         GOTO(stop, rc);
709         }
710
711         rc = dt_trans_start_local(env, dev, th);
712         if (rc)
713                 GOTO(stop, rc);
714
715         rc = dt_record_write(env, obj, &lbuf, &pos, th);
716         if (!rc && punch)
717                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
718
719         GOTO(stop, rc);
720
721 stop:
722         dt_trans_stop(env, dev, th);
723         return rc;
724 }
725
726 static int lustre_index_update_lma(const struct lu_env *env,
727                                    struct dt_object *obj,
728                                    void *buf, int bufsize)
729 {
730         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
731         struct lustre_mdt_attrs *lma = buf;
732         struct lu_buf lbuf = {
733                 .lb_buf = lma,
734                 .lb_len = sizeof(struct lustre_ost_attrs)
735         };
736         struct thandle *th;
737         int fl = LU_XATTR_REPLACE;
738         int rc;
739         ENTRY;
740
741         LASSERT(bufsize >= lbuf.lb_len);
742
743         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
744         if (unlikely(rc == -ENODATA)) {
745                 fl = LU_XATTR_CREATE;
746                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
747                                 LMAC_IDX_BACKUP, 0);
748                 rc = sizeof(*lma);
749         } else if (rc < sizeof(*lma)) {
750                 RETURN(rc < 0 ? rc : -EFAULT);
751         } else {
752                 lustre_lma_swab(lma);
753                 if (lma->lma_compat & LMAC_IDX_BACKUP)
754                         RETURN(0);
755
756                 lma->lma_compat |= LMAC_IDX_BACKUP;
757         }
758
759         lustre_lma_swab(lma);
760         lbuf.lb_len = rc;
761         th = dt_trans_create(env, dev);
762         if (IS_ERR(th))
763                 RETURN(rc);
764
765         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
766         if (rc)
767                 GOTO(stop, rc);
768
769         rc = dt_trans_start_local(env, dev, th);
770         if (rc)
771                 GOTO(stop, rc);
772
773         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
774
775         GOTO(stop, rc);
776
777 stop:
778         dt_trans_stop(env, dev, th);
779         return rc;
780 }
781
782 static int lustre_index_backup_one(const struct lu_env *env,
783                                    struct local_oid_storage *los,
784                                    struct dt_object *parent,
785                                    struct lustre_index_backup_unit *libu,
786                                    char *buf, int bufsize)
787 {
788         struct dt_device *dev = scrub_obj2dev(parent);
789         struct dt_object *tgt_obj = NULL;
790         struct dt_object *bak_obj = NULL;
791         const struct dt_it_ops *iops;
792         struct dt_it *di;
793         loff_t pos = sizeof(struct lustre_index_backup_header);
794         int count = 0;
795         int size = 0;
796         int rc;
797         ENTRY;
798
799         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
800                                              &libu->libu_fid, NULL));
801         if (IS_ERR_OR_NULL(tgt_obj))
802                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
803
804         if (!dt_object_exists(tgt_obj))
805                 GOTO(out, rc = 0);
806
807         if (!tgt_obj->do_index_ops) {
808                 struct dt_index_features feat;
809
810                 feat.dif_flags = DT_IND_UPDATE;
811                 feat.dif_keysize_min = libu->libu_keysize;
812                 feat.dif_keysize_max = libu->libu_keysize;
813                 feat.dif_recsize_min = libu->libu_recsize;
814                 feat.dif_recsize_max = libu->libu_recsize;
815                 feat.dif_ptrsize = 4;
816                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
817                 if (rc)
818                         GOTO(out, rc);
819         }
820
821         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
822         bak_obj = local_file_find_or_create(env, los, parent, buf,
823                                             S_IFREG | S_IRUGO | S_IWUSR);
824         if (IS_ERR_OR_NULL(bak_obj))
825                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
826
827         iops = &tgt_obj->do_index_ops->dio_it;
828         di = iops->init(env, tgt_obj, 0);
829         if (IS_ERR(di))
830                 GOTO(out, rc = PTR_ERR(di));
831
832         rc = iops->load(env, di, 0);
833         if (!rc)
834                 rc = iops->next(env, di);
835         else if (rc > 0)
836                 rc = 0;
837
838         while (!rc) {
839                 void *key;
840                 void *rec;
841
842                 key = iops->key(env, di);
843                 memcpy(&buf[size], key, libu->libu_keysize);
844                 size += libu->libu_keysize;
845                 rec = &buf[size];
846                 rc = iops->rec(env, di, rec, 0);
847                 if (rc)
848                         GOTO(fini, rc);
849
850                 size += libu->libu_recsize;
851                 count++;
852                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
853                         rc = lustre_index_backup_body(env, bak_obj, &pos,
854                                                       buf, size);
855                         if (rc)
856                                 GOTO(fini, rc);
857
858                         size = 0;
859                 }
860
861                 rc = iops->next(env, di);
862         }
863
864         if (rc >= 0 && size > 0)
865                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
866
867         if (rc < 0)
868                 GOTO(fini, rc);
869
870         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
871                                         libu->libu_keysize, libu->libu_recsize,
872                                         buf, bufsize, count);
873         if (!rc)
874                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
875
876         if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
877                 LASSERT(bufsize >= 512);
878
879                 pos = 0;
880                 memset(buf, 0, 512);
881                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
882         }
883
884         GOTO(fini, rc);
885
886 fini:
887         iops->fini(env, di);
888 out:
889         if (!IS_ERR_OR_NULL(tgt_obj))
890                 dt_object_put_nocache(env, tgt_obj);
891         if (!IS_ERR_OR_NULL(bak_obj))
892                 dt_object_put_nocache(env, bak_obj);
893         return rc;
894 }
895
896 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
897                          const char *devname, struct list_head *head,
898                          spinlock_t *lock, int *guard, bool backup)
899 {
900         struct lustre_index_backup_unit *libu;
901         struct local_oid_storage *los = NULL;
902         struct dt_object *parent = NULL;
903         char *buf = NULL;
904         struct lu_fid fid;
905         int rc;
906         ENTRY;
907
908         if (dev->dd_rdonly || *guard)
909                 RETURN_EXIT;
910
911         spin_lock(lock);
912         *guard = 1;
913         spin_unlock(lock);
914
915         if (list_empty(head))
916                 RETURN_EXIT;
917
918         /* Handle kinds of failures during mount process. */
919         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
920                 backup = false;
921
922         if (backup) {
923                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
924                 if (!buf) {
925                         backup = false;
926                         goto scan;
927                 }
928
929                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
930                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
931                                                     &fid, NULL));
932                 if (IS_ERR_OR_NULL(parent)) {
933                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
934                                devname, parent ? PTR_ERR(parent) : -ENOENT);
935                         backup = false;
936                         goto scan;
937                 }
938
939                 lu_local_name_obj_fid(&fid, 1);
940                 rc = local_oid_storage_init(env, dev, &fid, &los);
941                 if (rc) {
942                         CERROR("%s: failed to init local storage: rc = %d\n",
943                                devname, rc);
944                         backup = false;
945                 }
946         }
947
948 scan:
949         spin_lock(lock);
950         while (!list_empty(head)) {
951                 libu = list_entry(head->next,
952                                   struct lustre_index_backup_unit, libu_link);
953                 list_del_init(&libu->libu_link);
954                 spin_unlock(lock);
955
956                 if (backup) {
957                         rc = lustre_index_backup_one(env, los, parent, libu,
958                                                      buf, INDEX_BACKUP_BUFSIZE);
959                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
960                                devname, PFID(&libu->libu_fid), rc);
961                 }
962
963                 OBD_FREE_PTR(libu);
964                 spin_lock(lock);
965         }
966         spin_unlock(lock);
967
968         if (los)
969                 local_oid_storage_fini(env, los);
970         if (parent)
971                 dt_object_put_nocache(env, parent);
972         if (buf)
973                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
974
975         EXIT;
976 }
977 EXPORT_SYMBOL(lustre_index_backup);
978
979 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
980                          const struct lu_fid *parent_fid,
981                          const struct lu_fid *tgt_fid,
982                          const struct lu_fid *bak_fid, const char *name,
983                          struct list_head *head, spinlock_t *lock,
984                          char *buf, int bufsize)
985 {
986         struct dt_object *parent_obj = NULL;
987         struct dt_object *tgt_obj = NULL;
988         struct dt_object *bak_obj = NULL;
989         struct lustre_index_backup_header *header;
990         struct dt_index_features *feat;
991         struct dt_object_format *dof;
992         struct lu_attr *la;
993         struct thandle *th;
994         struct lu_object_conf conf;
995         struct dt_insert_rec ent;
996         struct lu_buf lbuf;
997         struct lu_fid tfid;
998         loff_t pos = 0;
999         __u32 keysize;
1000         __u32 recsize;
1001         __u32 pairsize;
1002         int count;
1003         int rc;
1004         bool registered = false;
1005         ENTRY;
1006
1007         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1008                 sizeof(*feat) + sizeof(*header));
1009
1010         memset(buf, 0, bufsize);
1011         la = (struct lu_attr *)buf;
1012         dof = (void *)la + sizeof(*la);
1013         feat = (void *)dof + sizeof(*dof);
1014         header = (void *)feat + sizeof(*feat);
1015         lbuf.lb_buf = header;
1016         lbuf.lb_len = sizeof(*header);
1017
1018         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1019                                              tgt_fid, NULL));
1020         if (IS_ERR_OR_NULL(tgt_obj))
1021                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1022
1023         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1024                                              bak_fid, NULL));
1025         if (IS_ERR_OR_NULL(bak_obj))
1026                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1027
1028         if (!dt_object_exists(bak_obj))
1029                 GOTO(out, rc = -ENOENT);
1030
1031         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1032                                                 parent_fid, NULL));
1033         if (IS_ERR_OR_NULL(parent_obj))
1034                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1035
1036         LASSERT(dt_object_exists(parent_obj));
1037
1038         if (unlikely(!dt_try_as_dir(env, parent_obj)))
1039                 GOTO(out, rc = -ENOTDIR);
1040
1041         rc = dt_attr_get(env, tgt_obj, la);
1042         if (rc)
1043                 GOTO(out, rc);
1044
1045         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1046         if (rc)
1047                 GOTO(out, rc);
1048
1049         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1050                 GOTO(out, rc = -EINVAL);
1051
1052         fid_le_to_cpu(&tfid, &header->libh_owner);
1053         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1054                 GOTO(out, rc = -EINVAL);
1055
1056         keysize = le32_to_cpu(header->libh_keysize);
1057         recsize = le32_to_cpu(header->libh_recsize);
1058         pairsize = keysize + recsize;
1059
1060         memset(feat, 0, sizeof(*feat));
1061         feat->dif_flags = DT_IND_UPDATE;
1062         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1063         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1064         feat->dif_ptrsize = 4;
1065
1066         /* T1: remove old name entry and destroy old index. */
1067         th = dt_trans_create(env, dev);
1068         if (IS_ERR(th))
1069                 GOTO(out, rc = PTR_ERR(th));
1070
1071         rc = dt_declare_delete(env, parent_obj,
1072                                (const struct dt_key *)name, th);
1073         if (rc)
1074                 GOTO(stop, rc);
1075
1076         rc = dt_declare_destroy(env, tgt_obj, th);
1077         if (rc)
1078                 GOTO(stop, rc);
1079
1080         rc = dt_trans_start_local(env, dev, th);
1081         if (rc)
1082                 GOTO(stop, rc);
1083
1084         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1085         if (rc)
1086                 GOTO(stop, rc);
1087
1088         dt_write_lock(env, tgt_obj, 0);
1089         rc = dt_destroy(env, tgt_obj, th);
1090         dt_write_unlock(env, tgt_obj);
1091         dt_trans_stop(env, dev, th);
1092         if (rc)
1093                 GOTO(out, rc);
1094
1095         la->la_valid = LA_MODE | LA_UID | LA_GID;
1096         conf.loc_flags = LOC_F_NEW;
1097         dof->u.dof_idx.di_feat = feat;
1098         dof->dof_type = DFT_INDEX;
1099         ent.rec_type = S_IFREG;
1100         ent.rec_fid = tgt_fid;
1101
1102         /* Drop cache before re-create it. */
1103         dt_object_put_nocache(env, tgt_obj);
1104         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1105                                              tgt_fid, &conf));
1106         if (IS_ERR_OR_NULL(tgt_obj))
1107                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1108
1109         LASSERT(!dt_object_exists(tgt_obj));
1110
1111         /* T2: create new index and insert new name entry. */
1112         th = dt_trans_create(env, dev);
1113         if (IS_ERR(th))
1114                 GOTO(out, rc = PTR_ERR(th));
1115
1116         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1117         if (rc)
1118                 GOTO(stop, rc);
1119
1120         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1121                                (const struct dt_key *)name, th);
1122         if (rc)
1123                 GOTO(stop, rc);
1124
1125         rc = dt_trans_start_local(env, dev, th);
1126         if (rc)
1127                 GOTO(stop, rc);
1128
1129         dt_write_lock(env, tgt_obj, 0);
1130         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1131         dt_write_unlock(env, tgt_obj);
1132         if (rc)
1133                 GOTO(stop, rc);
1134
1135         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1136                        (const struct dt_key *)name, th, 1);
1137         dt_trans_stop(env, dev, th);
1138         /* Some index name may has been inserted by OSD
1139          * automatically when create the index object. */
1140         if (unlikely(rc == -EEXIST))
1141                 rc = 0;
1142         if (rc)
1143                 GOTO(out, rc);
1144
1145         /* The new index will register via index_try. */
1146         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1147         if (rc)
1148                 GOTO(out, rc);
1149
1150         registered = true;
1151         count = le32_to_cpu(header->libh_count);
1152         while (!rc && count > 0) {
1153                 int size = pairsize * count;
1154                 int items = count;
1155                 int i;
1156
1157                 if (size > bufsize) {
1158                         items = bufsize / pairsize;
1159                         size = pairsize * items;
1160                 }
1161
1162                 lbuf.lb_buf = buf;
1163                 lbuf.lb_len = size;
1164                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1165                 for (i = 0; i < items && !rc; i++) {
1166                         void *key = &buf[i * pairsize];
1167                         void *rec = &buf[i * pairsize + keysize];
1168
1169                         /* Tn: restore the records. */
1170                         th = dt_trans_create(env, dev);
1171                         if (!th)
1172                                 GOTO(out, rc = -ENOMEM);
1173
1174                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1175                         if (rc)
1176                                 GOTO(stop, rc);
1177
1178                         rc = dt_trans_start_local(env, dev, th);
1179                         if (rc)
1180                                 GOTO(stop, rc);
1181
1182                         rc = dt_insert(env, tgt_obj, rec, key, th, 1);
1183                         if (unlikely(rc == -EEXIST))
1184                                 rc = 0;
1185
1186                         dt_trans_stop(env, dev, th);
1187                 }
1188
1189                 count -= items;
1190         }
1191
1192         GOTO(out, rc);
1193
1194 stop:
1195         dt_trans_stop(env, dev, th);
1196         if (rc && registered)
1197                 /* Degister the index to avoid overwriting the backup. */
1198                 lustre_index_degister(head, lock, tgt_fid);
1199
1200 out:
1201         if (!IS_ERR_OR_NULL(tgt_obj))
1202                 dt_object_put_nocache(env, tgt_obj);
1203         if (!IS_ERR_OR_NULL(bak_obj))
1204                 dt_object_put_nocache(env, bak_obj);
1205         if (!IS_ERR_OR_NULL(parent_obj))
1206                 dt_object_put_nocache(env, parent_obj);
1207         return rc;
1208 }
1209 EXPORT_SYMBOL(lustre_index_restore);