Whamcloud - gitweb
87b442d987ba3909a79d5735ebdcdddbf07926ed
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
43 {
44         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45         *cookie = le64_to_cpu(ent->lde_hash);
46         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
49
50         /* Make sure the name is terminated with '0'.
51          * The data (type) after ent::lde_name maybe
52          * broken, but we do not care. */
53         ent->lde_name[ent->lde_namelen] = 0;
54 }
55
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
57 {
58         const struct dt_it_ops  *iops;
59         struct dt_it            *di;
60
61         spin_lock(&lfsck->li_lock);
62         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63         di = lfsck->li_di_oit;
64         lfsck->li_di_oit = NULL;
65         spin_unlock(&lfsck->li_lock);
66         iops->put(env, di);
67 }
68
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76         di = lfsck->li_di_dir;
77         lfsck->li_di_dir = NULL;
78         lfsck->li_cookie_dir = 0;
79         spin_unlock(&lfsck->li_lock);
80         iops->put(env, di);
81 }
82
83 static void lfsck_close_dir(const struct lu_env *env,
84                             struct lfsck_instance *lfsck)
85 {
86         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
87         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
88         struct dt_it            *dir_di   = lfsck->li_di_dir;
89
90         lfsck_di_dir_put(env, lfsck);
91         dir_iops->fini(env, dir_di);
92         lfsck->li_obj_dir = NULL;
93         lfsck_object_put(env, dir_obj);
94 }
95
96 static int lfsck_update_lma(const struct lu_env *env,
97                             struct lfsck_instance *lfsck, struct dt_object *obj)
98 {
99         struct lfsck_thread_info        *info   = lfsck_env_info(env);
100         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
101         struct dt_device                *dt     = lfsck->li_bottom;
102         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
103         struct lu_buf                   *buf;
104         struct thandle                  *th;
105         int                              fl;
106         int                              rc;
107         ENTRY;
108
109         if (bk->lb_param & LPF_DRYRUN)
110                 RETURN(0);
111
112         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
113         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
114         if (rc < 0) {
115                 if (rc != -ENODATA)
116                         RETURN(rc);
117
118                 fl = LU_XATTR_CREATE;
119                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
120         } else {
121                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
122                         RETURN(-EINVAL);
123
124                 fl = LU_XATTR_REPLACE;
125                 lustre_lma_swab(lma);
126                 lustre_lma_init(lma, lfsck_dto2fid(obj),
127                                 lma->lma_compat | LMAC_FID_ON_OST,
128                                 lma->lma_incompat);
129         }
130         lustre_lma_swab(lma);
131
132         th = dt_trans_create(env, dt);
133         if (IS_ERR(th))
134                 RETURN(PTR_ERR(th));
135
136         buf = lfsck_buf_get(env, lma, sizeof(*lma));
137         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
138         if (rc != 0)
139                 GOTO(stop, rc);
140
141         rc = dt_trans_start(env, dt, th);
142         if (rc != 0)
143                 GOTO(stop, rc);
144
145         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
146
147         GOTO(stop, rc);
148
149 stop:
150         dt_trans_stop(env, dt, th);
151         return rc;
152 }
153
154 static int lfsck_master_dir_engine(const struct lu_env *env,
155                                    struct lfsck_instance *lfsck)
156 {
157         struct lfsck_thread_info        *info   = lfsck_env_info(env);
158         struct dt_object                *dir    = lfsck->li_obj_dir;
159         const struct dt_it_ops          *iops   = &dir->do_index_ops->dio_it;
160         struct dt_it                    *di     = lfsck->li_di_dir;
161         struct lu_dirent                *ent    = &info->lti_ent;
162         struct lu_fid                   *fid    = &info->lti_fid;
163         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
164         struct ptlrpc_thread            *thread = &lfsck->li_thread;
165         int                              rc;
166         ENTRY;
167
168         do {
169                 struct dt_object *child;
170
171                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
172                     cfs_fail_val > 0) {
173                         struct l_wait_info lwi;
174
175                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
176                                           NULL, NULL);
177                         l_wait_event(thread->t_ctl_waitq,
178                                      !thread_is_running(thread),
179                                      &lwi);
180                 }
181
182                 lfsck->li_new_scanned++;
183                 rc = iops->rec(env, di, (struct dt_rec *)ent,
184                                lfsck->li_args_dir);
185                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
186                 if (rc != 0) {
187                         CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
188                                "parent "DFID", cookie "LPX64": rc = %d\n",
189                                lfsck_lfsck2name(lfsck),
190                                PFID(lfsck_dto2fid(dir)),
191                                lfsck->li_cookie_dir, rc);
192                         lfsck_fail(env, lfsck, true);
193                         if (bk->lb_param & LPF_FAILOUT)
194                                 RETURN(rc);
195                         else
196                                 goto checkpoint;
197                 }
198
199                 if (ent->lde_attrs & LUDA_IGNORE)
200                         goto checkpoint;
201
202                 *fid = ent->lde_fid;
203                 child = lfsck_object_find(env, lfsck, fid);
204                 if (child == NULL) {
205                         goto checkpoint;
206                 } else if (IS_ERR(child)) {
207                         CDEBUG(D_LFSCK, "%s: scan dir failed at find target, "
208                                "parent "DFID", child %.*s "DFID": rc = %d\n",
209                                lfsck_lfsck2name(lfsck),
210                                PFID(lfsck_dto2fid(dir)),
211                                ent->lde_namelen, ent->lde_name,
212                                PFID(&ent->lde_fid), rc);
213                         lfsck_fail(env, lfsck, true);
214                         if (bk->lb_param & LPF_FAILOUT)
215                                 RETURN(PTR_ERR(child));
216                         else
217                                 goto checkpoint;
218                 }
219
220                 /* XXX: Currently, skip remote object, the consistency for
221                  *      remote object will be processed in LFSCK phase III. */
222                 if (dt_object_exists(child) && !dt_object_remote(child))
223                         rc = lfsck_exec_dir(env, lfsck, child, ent);
224                 lfsck_object_put(env, child);
225                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
226                         RETURN(rc);
227
228 checkpoint:
229                 rc = lfsck_checkpoint(env, lfsck);
230                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
231                         RETURN(rc);
232
233                 /* Rate control. */
234                 lfsck_control_speed(lfsck);
235                 if (unlikely(!thread_is_running(thread))) {
236                         CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
237                                "parent "DFID", cookie "LPX64"\n",
238                                lfsck_lfsck2name(lfsck),
239                                PFID(lfsck_dto2fid(dir)),
240                                lfsck->li_cookie_dir);
241                         RETURN(0);
242                 }
243
244                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
245                         spin_lock(&lfsck->li_lock);
246                         thread_set_flags(thread, SVC_STOPPING);
247                         spin_unlock(&lfsck->li_lock);
248                         RETURN(-EINVAL);
249                 }
250
251                 rc = iops->next(env, di);
252         } while (rc == 0);
253
254         if (rc > 0 && !lfsck->li_oit_over)
255                 lfsck_close_dir(env, lfsck);
256
257         RETURN(rc);
258 }
259
260 static int lfsck_master_oit_engine(const struct lu_env *env,
261                                    struct lfsck_instance *lfsck)
262 {
263         struct lfsck_thread_info        *info   = lfsck_env_info(env);
264         const struct dt_it_ops          *iops   =
265                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
266         struct dt_it                    *di     = lfsck->li_di_oit;
267         struct lu_fid                   *fid    = &info->lti_fid;
268         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
269         struct ptlrpc_thread            *thread = &lfsck->li_thread;
270         __u32                            idx    =
271                                 lfsck_dev_idx(lfsck->li_bottom);
272         int                              rc;
273         ENTRY;
274
275         do {
276                 struct dt_object *target;
277                 bool              update_lma = false;
278
279                 if (lfsck->li_di_dir != NULL) {
280                         rc = lfsck_master_dir_engine(env, lfsck);
281                         if (rc <= 0)
282                                 RETURN(rc);
283                 }
284
285                 if (unlikely(lfsck->li_oit_over))
286                         RETURN(1);
287
288                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
289                     cfs_fail_val > 0) {
290                         struct l_wait_info lwi;
291
292                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
293                                           NULL, NULL);
294                         l_wait_event(thread->t_ctl_waitq,
295                                      !thread_is_running(thread),
296                                      &lwi);
297                 }
298
299                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
300                         RETURN(0);
301
302                 lfsck->li_current_oit_processed = 1;
303                 lfsck->li_new_scanned++;
304                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
305                 if (rc != 0) {
306                         CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
307                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
308                         lfsck_fail(env, lfsck, true);
309                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
310                                 RETURN(rc);
311                         else
312                                 goto checkpoint;
313                 }
314
315                 if (fid_is_idif(fid)) {
316                         __u32 idx1 = fid_idif_ost_idx(fid);
317
318                         LASSERT(!lfsck->li_master);
319
320                         /* It is an old format device, update the LMA. */
321                         if (idx != idx1) {
322                                 struct ost_id *oi = &info->lti_oi;
323
324                                 fid_to_ostid(fid, oi);
325                                 ostid_to_fid(fid, oi, idx);
326                                 update_lma = true;
327                         }
328                 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
329                            !fid_is_last_id(fid) && !fid_is_root(fid) &&
330                            !fid_seq_is_dot(fid_seq(fid))) {
331                         /* If the FID/object is only used locally and invisible
332                          * to external nodes, then LFSCK will not handle it. */
333                         goto checkpoint;
334                 }
335
336                 target = lfsck_object_find(env, lfsck, fid);
337                 if (target == NULL) {
338                         goto checkpoint;
339                 } else if (IS_ERR(target)) {
340                         CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
341                                DFID", cookie "LPU64": rc = %d\n",
342                                lfsck_lfsck2name(lfsck), PFID(fid),
343                                iops->store(env, di), rc);
344                         lfsck_fail(env, lfsck, true);
345                         if (bk->lb_param & LPF_FAILOUT)
346                                 RETURN(PTR_ERR(target));
347                         else
348                                 goto checkpoint;
349                 }
350
351                 /* XXX: Currently, skip remote object, the consistency for
352                  *      remote object will be processed in LFSCK phase III. */
353                 if (dt_object_exists(target) && !dt_object_remote(target)) {
354                         if (update_lma) {
355                                 rc = lfsck_update_lma(env, lfsck, target);
356                                 if (rc != 0)
357                                         CDEBUG(D_LFSCK, "%s: fail to update "
358                                                "LMA for "DFID": rc = %d\n",
359                                                lfsck_lfsck2name(lfsck),
360                                                PFID(lfsck_dto2fid(target)), rc);
361                         }
362                         if (rc == 0)
363                                 rc = lfsck_exec_oit(env, lfsck, target);
364                 }
365                 lfsck_object_put(env, target);
366                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
367                         RETURN(rc);
368
369 checkpoint:
370                 rc = lfsck_checkpoint(env, lfsck);
371                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
372                         RETURN(rc);
373
374                 /* Rate control. */
375                 lfsck_control_speed(lfsck);
376
377                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
378                         spin_lock(&lfsck->li_lock);
379                         thread_set_flags(thread, SVC_STOPPING);
380                         spin_unlock(&lfsck->li_lock);
381                         RETURN(-EINVAL);
382                 }
383
384                 rc = iops->next(env, di);
385                 if (unlikely(rc > 0))
386                         lfsck->li_oit_over = 1;
387                 else if (likely(rc == 0))
388                         lfsck->li_current_oit_processed = 0;
389
390                 if (unlikely(!thread_is_running(thread))) {
391                         CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
392                                "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
393                                iops->store(env, di));
394                         RETURN(0);
395                 }
396         } while (rc == 0 || lfsck->li_di_dir != NULL);
397
398         RETURN(rc);
399 }
400
401 int lfsck_master_engine(void *args)
402 {
403         struct lfsck_thread_args *lta      = args;
404         struct lu_env            *env      = &lta->lta_env;
405         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
406         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
407         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
408         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
409         struct dt_it             *oit_di;
410         struct l_wait_info        lwi      = { 0 };
411         int                       rc;
412         ENTRY;
413
414         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
415         if (IS_ERR(oit_di)) {
416                 rc = PTR_ERR(oit_di);
417                 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
418                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
419
420                 GOTO(fini_args, rc);
421         }
422
423         spin_lock(&lfsck->li_lock);
424         lfsck->li_di_oit = oit_di;
425         spin_unlock(&lfsck->li_lock);
426         rc = lfsck_prep(env, lfsck, lta->lta_lsp);
427         if (rc != 0)
428                 GOTO(fini_oit, rc);
429
430         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
431                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
432                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
433                lfsck->li_pos_current.lp_oit_cookie,
434                lfsck->li_pos_current.lp_dir_cookie,
435                PFID(&lfsck->li_pos_current.lp_dir_parent),
436                current_pid());
437
438         spin_lock(&lfsck->li_lock);
439         thread_set_flags(thread, SVC_RUNNING);
440         spin_unlock(&lfsck->li_lock);
441         wake_up_all(&thread->t_ctl_waitq);
442
443         l_wait_event(thread->t_ctl_waitq,
444                      lfsck->li_start_unplug ||
445                      !thread_is_running(thread),
446                      &lwi);
447         if (!thread_is_running(thread))
448                 GOTO(fini_oit, rc = 0);
449
450         if (!cfs_list_empty(&lfsck->li_list_scan) ||
451             cfs_list_empty(&lfsck->li_list_double_scan))
452                 rc = lfsck_master_oit_engine(env, lfsck);
453         else
454                 rc = 1;
455
456         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
457                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
458                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
459                lfsck->li_pos_current.lp_oit_cookie,
460                lfsck->li_pos_current.lp_dir_cookie,
461                PFID(&lfsck->li_pos_current.lp_dir_parent),
462                current_pid(), rc);
463
464         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
465                 rc = lfsck_post(env, lfsck, rc);
466
467         if (lfsck->li_di_dir != NULL)
468                 lfsck_close_dir(env, lfsck);
469
470 fini_oit:
471         lfsck_di_oit_put(env, lfsck);
472         oit_iops->fini(env, oit_di);
473         if (rc == 1) {
474                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
475                         rc = lfsck_double_scan(env, lfsck);
476                 else
477                         rc = 0;
478         } else {
479                 lfsck_quit(env, lfsck);
480         }
481
482         /* XXX: Purge the pinned objects in the future. */
483
484 fini_args:
485         spin_lock(&lfsck->li_lock);
486         thread_set_flags(thread, SVC_STOPPED);
487         spin_unlock(&lfsck->li_lock);
488         wake_up_all(&thread->t_ctl_waitq);
489         lfsck_thread_args_fini(lta);
490         return rc;
491 }