Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / obdclass / scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/obdclass/scrub.c
27  *
28  * The OI scrub is used for checking and (re)building Object Index files
29  * that are usually backend special. Here are some general scrub related
30  * functions that can be shared by different backends for OI scrub.
31  *
32  * Author: Fan Yong <fan.yong@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_LFSCK
36
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
41
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
43 {
44         return container_of_safe(obj->do_lu.lo_dev, struct dt_device,
45                                  dd_lu_dev);
46 }
47
48 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 {
50         uuid_copy(&des->sf_uuid, &src->sf_uuid);
51         des->sf_flags   = le64_to_cpu(src->sf_flags);
52         des->sf_magic   = le32_to_cpu(src->sf_magic);
53         des->sf_status  = le16_to_cpu(src->sf_status);
54         des->sf_param   = le16_to_cpu(src->sf_param);
55         des->sf_time_last_complete      =
56                                 le64_to_cpu(src->sf_time_last_complete);
57         des->sf_time_latest_start       =
58                                 le64_to_cpu(src->sf_time_latest_start);
59         des->sf_time_last_checkpoint    =
60                                 le64_to_cpu(src->sf_time_last_checkpoint);
61         des->sf_pos_latest_start        =
62                                 le64_to_cpu(src->sf_pos_latest_start);
63         des->sf_pos_last_checkpoint     =
64                                 le64_to_cpu(src->sf_pos_last_checkpoint);
65         des->sf_pos_first_inconsistent  =
66                                 le64_to_cpu(src->sf_pos_first_inconsistent);
67         des->sf_items_checked           =
68                                 le64_to_cpu(src->sf_items_checked);
69         des->sf_items_updated           =
70                                 le64_to_cpu(src->sf_items_updated);
71         des->sf_items_failed            =
72                                 le64_to_cpu(src->sf_items_failed);
73         des->sf_items_updated_prior     =
74                                 le64_to_cpu(src->sf_items_updated_prior);
75         des->sf_run_time        = le32_to_cpu(src->sf_run_time);
76         des->sf_success_count   = le32_to_cpu(src->sf_success_count);
77         des->sf_oi_count        = le16_to_cpu(src->sf_oi_count);
78         des->sf_internal_flags  = le16_to_cpu(src->sf_internal_flags);
79         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
80 }
81
82 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 {
84         uuid_copy(&des->sf_uuid, &src->sf_uuid);
85         des->sf_flags   = cpu_to_le64(src->sf_flags);
86         des->sf_magic   = cpu_to_le32(src->sf_magic);
87         des->sf_status  = cpu_to_le16(src->sf_status);
88         des->sf_param   = cpu_to_le16(src->sf_param);
89         des->sf_time_last_complete      =
90                                 cpu_to_le64(src->sf_time_last_complete);
91         des->sf_time_latest_start       =
92                                 cpu_to_le64(src->sf_time_latest_start);
93         des->sf_time_last_checkpoint    =
94                                 cpu_to_le64(src->sf_time_last_checkpoint);
95         des->sf_pos_latest_start        =
96                                 cpu_to_le64(src->sf_pos_latest_start);
97         des->sf_pos_last_checkpoint     =
98                                 cpu_to_le64(src->sf_pos_last_checkpoint);
99         des->sf_pos_first_inconsistent  =
100                                 cpu_to_le64(src->sf_pos_first_inconsistent);
101         des->sf_items_checked           =
102                                 cpu_to_le64(src->sf_items_checked);
103         des->sf_items_updated           =
104                                 cpu_to_le64(src->sf_items_updated);
105         des->sf_items_failed            =
106                                 cpu_to_le64(src->sf_items_failed);
107         des->sf_items_updated_prior     =
108                                 cpu_to_le64(src->sf_items_updated_prior);
109         des->sf_run_time        = cpu_to_le32(src->sf_run_time);
110         des->sf_success_count   = cpu_to_le32(src->sf_success_count);
111         des->sf_oi_count        = cpu_to_le16(src->sf_oi_count);
112         des->sf_internal_flags  = cpu_to_le16(src->sf_internal_flags);
113         memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
114 }
115
116 void scrub_file_init(struct lustre_scrub *scrub, uuid_t uuid)
117 {
118         struct scrub_file *sf = &scrub->os_file;
119
120         memset(sf, 0, sizeof(*sf));
121         uuid_copy(&sf->sf_uuid, &uuid);
122         sf->sf_magic = SCRUB_MAGIC_V1;
123         sf->sf_status = SS_INIT;
124 }
125 EXPORT_SYMBOL(scrub_file_init);
126
127 void scrub_file_reset(struct lustre_scrub *scrub, uuid_t uuid, u64 flags)
128 {
129         struct scrub_file *sf = &scrub->os_file;
130
131         CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
132                "%#llx, add flags = %#llx\n",
133                scrub->os_name, sf->sf_flags, flags);
134
135         uuid_copy(&sf->sf_uuid, &uuid);
136         sf->sf_status = SS_INIT;
137         sf->sf_flags |= flags;
138         sf->sf_flags &= ~SF_AUTO;
139         sf->sf_run_time = 0;
140         sf->sf_time_latest_start = 0;
141         sf->sf_time_last_checkpoint = 0;
142         sf->sf_pos_latest_start = 0;
143         sf->sf_pos_last_checkpoint = 0;
144         sf->sf_pos_first_inconsistent = 0;
145         sf->sf_items_checked = 0;
146         sf->sf_items_updated = 0;
147         sf->sf_items_failed = 0;
148         sf->sf_items_noscrub = 0;
149         sf->sf_items_igif = 0;
150         if (!scrub->os_in_join)
151                 sf->sf_items_updated_prior = 0;
152 }
153 EXPORT_SYMBOL(scrub_file_reset);
154
155 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
156 {
157         struct scrub_file *sf = &scrub->os_file;
158         struct lu_buf buf = {
159                 .lb_buf = &scrub->os_file_disk,
160                 .lb_len = sizeof(scrub->os_file_disk)
161         };
162         loff_t pos = 0;
163         int rc;
164
165         rc = dt_read(env, scrub->os_obj, &buf, &pos);
166         /* failure */
167         if (rc < 0) {
168                 CERROR("%s: fail to load scrub file: rc = %d\n",
169                        scrub->os_name, rc);
170                 return rc;
171         }
172
173         /* empty */
174         if (!rc)
175                 return -ENOENT;
176
177         /* corrupted */
178         if (rc < buf.lb_len) {
179                 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
180                        "expected = %d: rc = %d\n",
181                        scrub->os_name, (int)buf.lb_len, rc);
182                 return -EFAULT;
183         }
184
185         scrub_file_to_cpu(sf, &scrub->os_file_disk);
186         if (sf->sf_magic != SCRUB_MAGIC_V1) {
187                 CDEBUG(D_LFSCK, "%s: invalid scrub magic 0x%x != 0x%x\n",
188                        scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V1);
189                 return -EFAULT;
190         }
191
192         return 0;
193 }
194 EXPORT_SYMBOL(scrub_file_load);
195
196 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
197 {
198         struct scrub_file *sf = &scrub->os_file_disk;
199         struct dt_object *obj = scrub->os_obj;
200         struct dt_device *dev = scrub_obj2dev(obj);
201         struct lu_buf buf = {
202                 .lb_buf = sf,
203                 .lb_len = sizeof(*sf)
204         };
205         struct thandle *th;
206         loff_t pos = 0;
207         int rc;
208         ENTRY;
209
210         /* Skip store under rdonly mode. */
211         if (dev->dd_rdonly)
212                 RETURN(0);
213
214         scrub_file_to_le(sf, &scrub->os_file);
215         th = dt_trans_create(env, dev);
216         if (IS_ERR(th))
217                 GOTO(log, rc = PTR_ERR(th));
218
219         rc = dt_declare_record_write(env, obj, &buf, pos, th);
220         if (rc)
221                 GOTO(stop, rc);
222
223         rc = dt_trans_start_local(env, dev, th);
224         if (rc)
225                 GOTO(stop, rc);
226
227         rc = dt_record_write(env, obj, &buf, &pos, th);
228
229         GOTO(stop, rc);
230
231 stop:
232         dt_trans_stop(env, dev, th);
233
234 log:
235         if (rc)
236                 CERROR("%s: store scrub file: rc = %d\n",
237                        scrub->os_name, rc);
238         else
239                 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
240                        scrub->os_name, rc);
241
242         scrub->os_time_last_checkpoint = ktime_get_seconds();
243         scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
244                                          SCRUB_CHECKPOINT_INTERVAL;
245         return rc;
246 }
247 EXPORT_SYMBOL(scrub_file_store);
248
249 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
250 {
251         struct scrub_file *sf = &scrub->os_file;
252         time64_t now = ktime_get_seconds();
253         int rc;
254
255         if (likely(now < scrub->os_time_next_checkpoint ||
256                    scrub->os_new_checked == 0))
257                 return 0;
258
259         CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
260                scrub->os_name, scrub->os_pos_current);
261
262         down_write(&scrub->os_rwsem);
263         sf->sf_items_checked += scrub->os_new_checked;
264         scrub->os_new_checked = 0;
265         sf->sf_pos_last_checkpoint = scrub->os_pos_current;
266         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
267         sf->sf_run_time += now - scrub->os_time_last_checkpoint;
268         rc = scrub_file_store(env, scrub);
269         up_write(&scrub->os_rwsem);
270
271         return rc;
272 }
273 EXPORT_SYMBOL(scrub_checkpoint);
274
275 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
276                 void *data, __u32 flags)
277 {
278         struct ptlrpc_thread *thread = &scrub->os_thread;
279         struct task_struct *task;
280         int rc;
281         ENTRY;
282
283 again:
284         /* os_lock: sync status between stop and scrub thread */
285         spin_lock(&scrub->os_lock);
286         if (thread_is_running(thread)) {
287                 spin_unlock(&scrub->os_lock);
288                 RETURN(-EALREADY);
289         }
290
291         if (unlikely(thread_is_stopping(thread))) {
292                 spin_unlock(&scrub->os_lock);
293                 wait_event_idle(thread->t_ctl_waitq,
294                                 thread_is_stopped(thread));
295                 goto again;
296         }
297         spin_unlock(&scrub->os_lock);
298
299         if (scrub->os_file.sf_status == SS_COMPLETED) {
300                 if (!(flags & SS_SET_FAILOUT))
301                         flags |= SS_CLEAR_FAILOUT;
302
303                 if (!(flags & SS_SET_DRYRUN))
304                         flags |= SS_CLEAR_DRYRUN;
305
306                 flags |= SS_RESET;
307         }
308
309         scrub->os_start_flags = flags;
310         thread_set_flags(thread, 0);
311         task = kthread_run(threadfn, data, "OI_scrub");
312         if (IS_ERR(task)) {
313                 rc = PTR_ERR(task);
314                 CERROR("%s: cannot start iteration thread: rc = %d\n",
315                        scrub->os_name, rc);
316                 RETURN(rc);
317         }
318
319         wait_event_idle(thread->t_ctl_waitq,
320                         thread_is_running(thread) || thread_is_stopped(thread));
321
322         RETURN(0);
323 }
324 EXPORT_SYMBOL(scrub_start);
325
326 void scrub_stop(struct lustre_scrub *scrub)
327 {
328         struct ptlrpc_thread *thread = &scrub->os_thread;
329
330         /* os_lock: sync status between stop and scrub thread */
331         spin_lock(&scrub->os_lock);
332         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
333                 thread_set_flags(thread, SVC_STOPPING);
334                 spin_unlock(&scrub->os_lock);
335                 wake_up_all(&thread->t_ctl_waitq);
336                 wait_event_idle(thread->t_ctl_waitq,
337                                 thread_is_stopped(thread));
338                 /* Do not skip the last lock/unlock, which can guarantee that
339                  * the caller cannot return until the OI scrub thread exit. */
340                 spin_lock(&scrub->os_lock);
341         }
342         spin_unlock(&scrub->os_lock);
343 }
344 EXPORT_SYMBOL(scrub_stop);
345
346 const char *scrub_status_names[] = {
347         "init",
348         "scanning",
349         "completed",
350         "failed",
351         "stopped",
352         "paused",
353         "crashed",
354         NULL
355 };
356
357 const char *scrub_flags_names[] = {
358         "recreated",
359         "inconsistent",
360         "auto",
361         "upgrade",
362         NULL
363 };
364
365 const char *scrub_param_names[] = {
366         "failout",
367         "dryrun",
368         NULL
369 };
370
371 static void scrub_bits_dump(struct seq_file *m, int bits, const char *names[],
372                             const char *prefix)
373 {
374         int flag;
375         int i;
376
377         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
378
379         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
380                 if (flag & bits) {
381                         bits &= ~flag;
382                         seq_printf(m, "%s%c", names[i],
383                                    bits != 0 ? ',' : '\n');
384                 }
385         }
386 }
387
388 static void scrub_time_dump(struct seq_file *m, time64_t time,
389                             const char *prefix)
390 {
391         if (time != 0)
392                 seq_printf(m, "%s: %llu seconds\n", prefix,
393                            ktime_get_real_seconds() - time);
394         else
395                 seq_printf(m, "%s: N/A\n", prefix);
396 }
397
398 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
399 {
400         if (pos != 0)
401                 seq_printf(m, "%s: %llu\n", prefix, pos);
402         else
403                 seq_printf(m, "%s: N/A\n", prefix);
404 }
405
406 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
407 {
408         struct scrub_file *sf = &scrub->os_file;
409         u64 checked;
410         s64 speed;
411
412         down_read(&scrub->os_rwsem);
413         seq_printf(m, "name: OI_scrub\n"
414                    "magic: 0x%x\n"
415                    "oi_files: %d\n"
416                    "status: %s\n",
417                    sf->sf_magic, (int)sf->sf_oi_count,
418                    scrub_status_names[sf->sf_status]);
419
420         scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
421
422         scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
423
424         scrub_time_dump(m, sf->sf_time_last_complete,
425                         "time_since_last_completed");
426
427         scrub_time_dump(m, sf->sf_time_latest_start,
428                         "time_since_latest_start");
429
430         scrub_time_dump(m, sf->sf_time_last_checkpoint,
431                         "time_since_last_checkpoint");
432
433         scrub_pos_dump(m, sf->sf_pos_latest_start,
434                         "latest_start_position");
435
436         scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
437                         "last_checkpoint_position");
438
439         scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
440                         "first_failure_position");
441
442         checked = sf->sf_items_checked + scrub->os_new_checked;
443         seq_printf(m, "checked: %llu\n"
444                    "%s: %llu\n"
445                    "failed: %llu\n"
446                    "prior_%s: %llu\n"
447                    "noscrub: %llu\n"
448                    "igif: %llu\n"
449                    "success_count: %u\n",
450                    checked,
451                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
452                    sf->sf_items_updated, sf->sf_items_failed,
453                    sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
454                    sf->sf_items_updated_prior, sf->sf_items_noscrub,
455                    sf->sf_items_igif, sf->sf_success_count);
456
457         speed = checked;
458         if (thread_is_running(&scrub->os_thread)) {
459                 s64 new_checked = scrub->os_new_checked;
460                 time64_t duration;
461                 time64_t rtime;
462
463                 /* Since the time resolution is in seconds for new system
464                  * or small devices it ismore likely that duration will be
465                  * zero which will lead to inaccurate results.
466                  */
467                 duration = ktime_get_seconds() -
468                            scrub->os_time_last_checkpoint;
469                 if (duration != 0)
470                         new_checked = div_s64(new_checked, duration);
471
472                 rtime = sf->sf_run_time + duration;
473                 if (rtime != 0)
474                         speed = div_s64(speed, rtime);
475
476                 seq_printf(m, "run_time: %lld seconds\n"
477                            "average_speed: %lld objects/sec\n"
478                            "real_time_speed: %lld objects/sec\n"
479                            "current_position: %llu\n"
480                            "scrub_in_prior: %s\n"
481                            "scrub_full_speed: %s\n"
482                            "partial_scan: %s\n",
483                            rtime, speed, new_checked,
484                            scrub->os_pos_current,
485                            scrub->os_in_prior ? "yes" : "no",
486                            scrub->os_full_speed ? "yes" : "no",
487                            scrub->os_partial_scan ? "yes" : "no");
488         } else {
489                 if (sf->sf_run_time != 0)
490                         speed = div_s64(speed, sf->sf_run_time);
491                 seq_printf(m, "run_time: %d seconds\n"
492                            "average_speed: %lld objects/sec\n"
493                            "real_time_speed: N/A\n"
494                            "current_position: N/A\n",
495                            sf->sf_run_time, speed);
496         }
497
498         up_read(&scrub->os_rwsem);
499 }
500 EXPORT_SYMBOL(scrub_dump);
501
502 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
503                     const struct lu_fid *cfid, __u64 child,
504                     const char *name, int namelen)
505 {
506         struct lustre_index_restore_unit *liru;
507         int len = sizeof(*liru) + namelen + 1;
508
509         OBD_ALLOC(liru, len);
510         if (!liru)
511                 return -ENOMEM;
512
513         INIT_LIST_HEAD(&liru->liru_link);
514         liru->liru_pfid = *pfid;
515         liru->liru_cfid = *cfid;
516         liru->liru_clid = child;
517         liru->liru_len = len;
518         memcpy(liru->liru_name, name, namelen);
519         liru->liru_name[namelen] = 0;
520         list_add_tail(&liru->liru_link, head);
521
522         return 0;
523 }
524 EXPORT_SYMBOL(lustre_liru_new);
525
526 int lustre_index_register(struct dt_device *dev, const char *devname,
527                           struct list_head *head, spinlock_t *lock, int *guard,
528                           const struct lu_fid *fid,
529                           __u32 keysize, __u32 recsize)
530 {
531         struct lustre_index_backup_unit *libu, *pos;
532         int rc = 0;
533         ENTRY;
534
535         if (dev->dd_rdonly || *guard)
536                 RETURN(1);
537
538         OBD_ALLOC_PTR(libu);
539         if (!libu)
540                 RETURN(-ENOMEM);
541
542         INIT_LIST_HEAD(&libu->libu_link);
543         libu->libu_keysize = keysize;
544         libu->libu_recsize = recsize;
545         libu->libu_fid = *fid;
546
547         spin_lock(lock);
548         if (unlikely(*guard)) {
549                 spin_unlock(lock);
550                 OBD_FREE_PTR(libu);
551
552                 RETURN(1);
553         }
554
555         list_for_each_entry_reverse(pos, head, libu_link) {
556                 rc = lu_fid_cmp(&pos->libu_fid, fid);
557                 if (rc < 0) {
558                         list_add(&libu->libu_link, &pos->libu_link);
559                         spin_unlock(lock);
560
561                         RETURN(0);
562                 }
563
564                 if (!rc) {
565                         /* Registered already. But the former registered one
566                          * has different keysize/recsize. It may because that
567                          * the former values are from disk and corrupted, then
568                          * replace it with new values. */
569                         if (unlikely(keysize != pos->libu_keysize ||
570                                      recsize != pos->libu_recsize)) {
571                                 CWARN("%s: the index "DFID" has registered "
572                                       "with %u/%u, may be invalid, replace "
573                                       "with %u/%u\n",
574                                       devname, PFID(fid), pos->libu_keysize,
575                                       pos->libu_recsize, keysize, recsize);
576
577                                 pos->libu_keysize = keysize;
578                                 pos->libu_recsize = recsize;
579                         } else {
580                                 rc = 1;
581                         }
582
583                         spin_unlock(lock);
584                         OBD_FREE_PTR(libu);
585
586                         RETURN(rc);
587                 }
588         }
589
590         list_add(&libu->libu_link, head);
591         spin_unlock(lock);
592
593         RETURN(0);
594 }
595 EXPORT_SYMBOL(lustre_index_register);
596
597 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
598                                   const struct lu_fid *fid)
599 {
600         struct lustre_index_backup_unit *libu;
601         int rc = -ENOENT;
602
603         spin_lock(lock);
604         list_for_each_entry_reverse(libu, head, libu_link) {
605                 rc = lu_fid_cmp(&libu->libu_fid, fid);
606                 /* NOT registered. */
607                 if (rc < 0)
608                         break;
609
610                 if (!rc) {
611                         list_del(&libu->libu_link);
612                         break;
613                 }
614         }
615         spin_unlock(lock);
616
617         if (!rc)
618                 OBD_FREE_PTR(libu);
619 }
620
621 static void
622 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
623                                 __u32 keysize, __u32 recsize,
624                                 const struct lu_fid *fid, __u32 count)
625 {
626         memset(header, 0, sizeof(*header));
627         header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
628         header->libh_count = cpu_to_le32(count);
629         header->libh_keysize = cpu_to_le32(keysize);
630         header->libh_recsize = cpu_to_le32(recsize);
631         fid_cpu_to_le(&header->libh_owner, fid);
632 }
633
634 static int lustre_index_backup_body(const struct lu_env *env,
635                                     struct dt_object *obj, loff_t *pos,
636                                     void *buf, int bufsize)
637 {
638         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
639         struct thandle *th;
640         struct lu_buf lbuf = {
641                 .lb_buf = buf,
642                 .lb_len = bufsize
643         };
644         int rc;
645         ENTRY;
646
647         th = dt_trans_create(env, dev);
648         if (IS_ERR(th))
649                 RETURN(PTR_ERR(th));
650
651         rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
652         if (rc)
653                 GOTO(stop, rc);
654
655         rc = dt_trans_start_local(env, dev, th);
656         if (rc)
657                 GOTO(stop, rc);
658
659         rc = dt_record_write(env, obj, &lbuf, pos, th);
660
661         GOTO(stop, rc);
662
663 stop:
664         dt_trans_stop(env, dev, th);
665         return rc;
666 }
667
668 static int lustre_index_backup_header(const struct lu_env *env,
669                                       struct dt_object *obj,
670                                       const struct lu_fid *tgt_fid,
671                                       __u32 keysize, __u32 recsize,
672                                       void *buf, int bufsize, int count)
673 {
674         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
675         struct lustre_index_backup_header *header = buf;
676         struct lu_attr *la = buf;
677         struct thandle *th;
678         struct lu_buf lbuf = {
679                 .lb_buf = header,
680                 .lb_len = sizeof(*header)
681         };
682         loff_t size = sizeof(*header) + (keysize + recsize) * count;
683         loff_t pos = 0;
684         int rc;
685         bool punch = false;
686         ENTRY;
687
688         LASSERT(sizeof(*la) <= bufsize);
689         LASSERT(sizeof(*header) <= bufsize);
690
691         rc = dt_attr_get(env, obj, la);
692         if (rc)
693                 RETURN(rc);
694
695         if (la->la_size > size)
696                 punch = true;
697
698         lustre_index_backup_make_header(header, keysize, recsize,
699                                         tgt_fid, count);
700         th = dt_trans_create(env, dev);
701         if (IS_ERR(th))
702                 RETURN(PTR_ERR(th));
703
704         rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
705         if (rc)
706                 GOTO(stop, rc);
707
708         if (punch) {
709                 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
710                 if (rc)
711                         GOTO(stop, rc);
712         }
713
714         rc = dt_trans_start_local(env, dev, th);
715         if (rc)
716                 GOTO(stop, rc);
717
718         rc = dt_record_write(env, obj, &lbuf, &pos, th);
719         if (!rc && punch)
720                 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
721
722         GOTO(stop, rc);
723
724 stop:
725         dt_trans_stop(env, dev, th);
726         return rc;
727 }
728
729 static int lustre_index_update_lma(const struct lu_env *env,
730                                    struct dt_object *obj,
731                                    void *buf, int bufsize)
732 {
733         struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
734         struct lustre_mdt_attrs *lma = buf;
735         struct lu_buf lbuf = {
736                 .lb_buf = lma,
737                 .lb_len = sizeof(struct lustre_ost_attrs)
738         };
739         struct thandle *th;
740         int fl = LU_XATTR_REPLACE;
741         int rc;
742         ENTRY;
743
744         LASSERT(bufsize >= lbuf.lb_len);
745
746         rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
747         if (unlikely(rc == -ENODATA)) {
748                 fl = LU_XATTR_CREATE;
749                 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
750                                 LMAC_IDX_BACKUP, 0);
751                 rc = sizeof(*lma);
752         } else if (rc < sizeof(*lma)) {
753                 RETURN(rc < 0 ? rc : -EFAULT);
754         } else {
755                 lustre_lma_swab(lma);
756                 if (lma->lma_compat & LMAC_IDX_BACKUP)
757                         RETURN(0);
758
759                 lma->lma_compat |= LMAC_IDX_BACKUP;
760         }
761
762         lustre_lma_swab(lma);
763         lbuf.lb_len = rc;
764         th = dt_trans_create(env, dev);
765         if (IS_ERR(th))
766                 RETURN(rc);
767
768         rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
769         if (rc)
770                 GOTO(stop, rc);
771
772         rc = dt_trans_start_local(env, dev, th);
773         if (rc)
774                 GOTO(stop, rc);
775
776         rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
777
778         GOTO(stop, rc);
779
780 stop:
781         dt_trans_stop(env, dev, th);
782         return rc;
783 }
784
785 static int lustre_index_backup_one(const struct lu_env *env,
786                                    struct local_oid_storage *los,
787                                    struct dt_object *parent,
788                                    struct lustre_index_backup_unit *libu,
789                                    char *buf, int bufsize)
790 {
791         struct dt_device *dev = scrub_obj2dev(parent);
792         struct dt_object *tgt_obj = NULL;
793         struct dt_object *bak_obj = NULL;
794         const struct dt_it_ops *iops;
795         struct dt_it *di;
796         loff_t pos = sizeof(struct lustre_index_backup_header);
797         int count = 0;
798         int size = 0;
799         int rc;
800         ENTRY;
801
802         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
803                                              &libu->libu_fid, NULL));
804         if (IS_ERR_OR_NULL(tgt_obj))
805                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
806
807         if (!dt_object_exists(tgt_obj))
808                 GOTO(out, rc = 0);
809
810         if (!tgt_obj->do_index_ops) {
811                 struct dt_index_features feat;
812
813                 feat.dif_flags = DT_IND_UPDATE;
814                 feat.dif_keysize_min = libu->libu_keysize;
815                 feat.dif_keysize_max = libu->libu_keysize;
816                 feat.dif_recsize_min = libu->libu_recsize;
817                 feat.dif_recsize_max = libu->libu_recsize;
818                 feat.dif_ptrsize = 4;
819                 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
820                 if (rc)
821                         GOTO(out, rc);
822         }
823
824         lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
825         bak_obj = local_file_find_or_create(env, los, parent, buf,
826                                             S_IFREG | S_IRUGO | S_IWUSR);
827         if (IS_ERR_OR_NULL(bak_obj))
828                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
829
830         iops = &tgt_obj->do_index_ops->dio_it;
831         di = iops->init(env, tgt_obj, 0);
832         if (IS_ERR(di))
833                 GOTO(out, rc = PTR_ERR(di));
834
835         rc = iops->load(env, di, 0);
836         if (!rc)
837                 rc = iops->next(env, di);
838         else if (rc > 0)
839                 rc = 0;
840
841         while (!rc) {
842                 void *key;
843                 void *rec;
844
845                 key = iops->key(env, di);
846                 memcpy(&buf[size], key, libu->libu_keysize);
847                 size += libu->libu_keysize;
848                 rec = &buf[size];
849                 rc = iops->rec(env, di, rec, 0);
850                 if (rc)
851                         GOTO(fini, rc);
852
853                 size += libu->libu_recsize;
854                 count++;
855                 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
856                         rc = lustre_index_backup_body(env, bak_obj, &pos,
857                                                       buf, size);
858                         if (rc)
859                                 GOTO(fini, rc);
860
861                         size = 0;
862                 }
863
864                 rc = iops->next(env, di);
865         }
866
867         if (rc >= 0 && size > 0)
868                 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
869
870         if (rc < 0)
871                 GOTO(fini, rc);
872
873         rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
874                                         libu->libu_keysize, libu->libu_recsize,
875                                         buf, bufsize, count);
876         if (!rc)
877                 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
878
879         if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
880                 LASSERT(bufsize >= 512);
881
882                 pos = 0;
883                 memset(buf, 0, 512);
884                 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
885         }
886
887         GOTO(fini, rc);
888
889 fini:
890         iops->fini(env, di);
891 out:
892         if (!IS_ERR_OR_NULL(tgt_obj))
893                 dt_object_put_nocache(env, tgt_obj);
894         if (!IS_ERR_OR_NULL(bak_obj))
895                 dt_object_put_nocache(env, bak_obj);
896         return rc;
897 }
898
899 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
900                          const char *devname, struct list_head *head,
901                          spinlock_t *lock, int *guard, bool backup)
902 {
903         struct lustre_index_backup_unit *libu;
904         struct local_oid_storage *los = NULL;
905         struct dt_object *parent = NULL;
906         char *buf = NULL;
907         struct lu_fid fid;
908         int rc;
909         ENTRY;
910
911         if (dev->dd_rdonly || *guard)
912                 RETURN_EXIT;
913
914         spin_lock(lock);
915         *guard = 1;
916         spin_unlock(lock);
917
918         if (list_empty(head))
919                 RETURN_EXIT;
920
921         /* Handle kinds of failures during mount process. */
922         if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
923                 backup = false;
924
925         if (backup) {
926                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
927                 if (!buf) {
928                         backup = false;
929                         goto scan;
930                 }
931
932                 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
933                 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
934                                                     &fid, NULL));
935                 if (IS_ERR_OR_NULL(parent)) {
936                         CERROR("%s: failed to locate backup dir: rc = %ld\n",
937                                devname, parent ? PTR_ERR(parent) : -ENOENT);
938                         backup = false;
939                         goto scan;
940                 }
941
942                 lu_local_name_obj_fid(&fid, 1);
943                 rc = local_oid_storage_init(env, dev, &fid, &los);
944                 if (rc) {
945                         CERROR("%s: failed to init local storage: rc = %d\n",
946                                devname, rc);
947                         backup = false;
948                 }
949         }
950
951 scan:
952         spin_lock(lock);
953         while (!list_empty(head)) {
954                 libu = list_entry(head->next,
955                                   struct lustre_index_backup_unit, libu_link);
956                 list_del_init(&libu->libu_link);
957                 spin_unlock(lock);
958
959                 if (backup) {
960                         rc = lustre_index_backup_one(env, los, parent, libu,
961                                                      buf, INDEX_BACKUP_BUFSIZE);
962                         CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
963                                devname, PFID(&libu->libu_fid), rc);
964                 }
965
966                 OBD_FREE_PTR(libu);
967                 spin_lock(lock);
968         }
969         spin_unlock(lock);
970
971         if (los)
972                 local_oid_storage_fini(env, los);
973         if (parent)
974                 dt_object_put_nocache(env, parent);
975         if (buf)
976                 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
977
978         EXIT;
979 }
980 EXPORT_SYMBOL(lustre_index_backup);
981
982 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
983                          const struct lu_fid *parent_fid,
984                          const struct lu_fid *tgt_fid,
985                          const struct lu_fid *bak_fid, const char *name,
986                          struct list_head *head, spinlock_t *lock,
987                          char *buf, int bufsize)
988 {
989         struct dt_object *parent_obj = NULL;
990         struct dt_object *tgt_obj = NULL;
991         struct dt_object *bak_obj = NULL;
992         struct lustre_index_backup_header *header;
993         struct dt_index_features *feat;
994         struct dt_object_format *dof;
995         struct lu_attr *la;
996         struct thandle *th;
997         struct lu_object_conf conf;
998         struct dt_insert_rec ent;
999         struct lu_buf lbuf;
1000         struct lu_fid tfid;
1001         loff_t pos = 0;
1002         __u32 keysize;
1003         __u32 recsize;
1004         __u32 pairsize;
1005         int count;
1006         int rc;
1007         bool registered = false;
1008         ENTRY;
1009
1010         LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1011                 sizeof(*feat) + sizeof(*header));
1012
1013         memset(buf, 0, bufsize);
1014         la = (struct lu_attr *)buf;
1015         dof = (void *)la + sizeof(*la);
1016         feat = (void *)dof + sizeof(*dof);
1017         header = (void *)feat + sizeof(*feat);
1018         lbuf.lb_buf = header;
1019         lbuf.lb_len = sizeof(*header);
1020
1021         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1022                                              tgt_fid, NULL));
1023         if (IS_ERR_OR_NULL(tgt_obj))
1024                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1025
1026         bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1027                                              bak_fid, NULL));
1028         if (IS_ERR_OR_NULL(bak_obj))
1029                 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1030
1031         if (!dt_object_exists(bak_obj))
1032                 GOTO(out, rc = -ENOENT);
1033
1034         parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1035                                                 parent_fid, NULL));
1036         if (IS_ERR_OR_NULL(parent_obj))
1037                 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1038
1039         LASSERT(dt_object_exists(parent_obj));
1040
1041         if (unlikely(!dt_try_as_dir(env, parent_obj)))
1042                 GOTO(out, rc = -ENOTDIR);
1043
1044         rc = dt_attr_get(env, tgt_obj, la);
1045         if (rc)
1046                 GOTO(out, rc);
1047
1048         rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1049         if (rc)
1050                 GOTO(out, rc);
1051
1052         if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1053                 GOTO(out, rc = -EINVAL);
1054
1055         fid_le_to_cpu(&tfid, &header->libh_owner);
1056         if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1057                 GOTO(out, rc = -EINVAL);
1058
1059         keysize = le32_to_cpu(header->libh_keysize);
1060         recsize = le32_to_cpu(header->libh_recsize);
1061         pairsize = keysize + recsize;
1062
1063         memset(feat, 0, sizeof(*feat));
1064         feat->dif_flags = DT_IND_UPDATE;
1065         feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1066         feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1067         feat->dif_ptrsize = 4;
1068
1069         /* T1: remove old name entry and destroy old index. */
1070         th = dt_trans_create(env, dev);
1071         if (IS_ERR(th))
1072                 GOTO(out, rc = PTR_ERR(th));
1073
1074         rc = dt_declare_delete(env, parent_obj,
1075                                (const struct dt_key *)name, th);
1076         if (rc)
1077                 GOTO(stop, rc);
1078
1079         rc = dt_declare_destroy(env, tgt_obj, th);
1080         if (rc)
1081                 GOTO(stop, rc);
1082
1083         rc = dt_trans_start_local(env, dev, th);
1084         if (rc)
1085                 GOTO(stop, rc);
1086
1087         rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1088         if (rc)
1089                 GOTO(stop, rc);
1090
1091         dt_write_lock(env, tgt_obj, 0);
1092         rc = dt_destroy(env, tgt_obj, th);
1093         dt_write_unlock(env, tgt_obj);
1094         dt_trans_stop(env, dev, th);
1095         if (rc)
1096                 GOTO(out, rc);
1097
1098         la->la_valid = LA_MODE | LA_UID | LA_GID;
1099         conf.loc_flags = LOC_F_NEW;
1100         dof->u.dof_idx.di_feat = feat;
1101         dof->dof_type = DFT_INDEX;
1102         ent.rec_type = S_IFREG;
1103         ent.rec_fid = tgt_fid;
1104
1105         /* Drop cache before re-create it. */
1106         dt_object_put_nocache(env, tgt_obj);
1107         tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1108                                              tgt_fid, &conf));
1109         if (IS_ERR_OR_NULL(tgt_obj))
1110                 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1111
1112         LASSERT(!dt_object_exists(tgt_obj));
1113
1114         /* T2: create new index and insert new name entry. */
1115         th = dt_trans_create(env, dev);
1116         if (IS_ERR(th))
1117                 GOTO(out, rc = PTR_ERR(th));
1118
1119         rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1120         if (rc)
1121                 GOTO(stop, rc);
1122
1123         rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1124                                (const struct dt_key *)name, th);
1125         if (rc)
1126                 GOTO(stop, rc);
1127
1128         rc = dt_trans_start_local(env, dev, th);
1129         if (rc)
1130                 GOTO(stop, rc);
1131
1132         dt_write_lock(env, tgt_obj, 0);
1133         rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1134         dt_write_unlock(env, tgt_obj);
1135         if (rc)
1136                 GOTO(stop, rc);
1137
1138         rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1139                        (const struct dt_key *)name, th);
1140         dt_trans_stop(env, dev, th);
1141         /* Some index name may has been inserted by OSD
1142          * automatically when create the index object. */
1143         if (unlikely(rc == -EEXIST))
1144                 rc = 0;
1145         if (rc)
1146                 GOTO(out, rc);
1147
1148         /* The new index will register via index_try. */
1149         rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1150         if (rc)
1151                 GOTO(out, rc);
1152
1153         registered = true;
1154         count = le32_to_cpu(header->libh_count);
1155         while (!rc && count > 0) {
1156                 int size = pairsize * count;
1157                 int items = count;
1158                 int i;
1159
1160                 if (size > bufsize) {
1161                         items = bufsize / pairsize;
1162                         size = pairsize * items;
1163                 }
1164
1165                 lbuf.lb_buf = buf;
1166                 lbuf.lb_len = size;
1167                 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1168                 for (i = 0; i < items && !rc; i++) {
1169                         void *key = &buf[i * pairsize];
1170                         void *rec = &buf[i * pairsize + keysize];
1171
1172                         /* Tn: restore the records. */
1173                         th = dt_trans_create(env, dev);
1174                         if (!th)
1175                                 GOTO(out, rc = -ENOMEM);
1176
1177                         rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1178                         if (rc)
1179                                 GOTO(stop, rc);
1180
1181                         rc = dt_trans_start_local(env, dev, th);
1182                         if (rc)
1183                                 GOTO(stop, rc);
1184
1185                         rc = dt_insert(env, tgt_obj, rec, key, th);
1186                         if (unlikely(rc == -EEXIST))
1187                                 rc = 0;
1188
1189                         dt_trans_stop(env, dev, th);
1190                 }
1191
1192                 count -= items;
1193         }
1194
1195         GOTO(out, rc);
1196
1197 stop:
1198         dt_trans_stop(env, dev, th);
1199         if (rc && registered)
1200                 /* Degister the index to avoid overwriting the backup. */
1201                 lustre_index_degister(head, lock, tgt_fid);
1202
1203 out:
1204         if (!IS_ERR_OR_NULL(tgt_obj))
1205                 dt_object_put_nocache(env, tgt_obj);
1206         if (!IS_ERR_OR_NULL(bak_obj))
1207                 dt_object_put_nocache(env, bak_obj);
1208         if (!IS_ERR_OR_NULL(parent_obj))
1209                 dt_object_put_nocache(env, parent_obj);
1210         return rc;
1211 }
1212 EXPORT_SYMBOL(lustre_index_restore);