Whamcloud - gitweb
LU-1267 lfsck: framework (3) for MDT-OST consistency
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
43 {
44         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45         *cookie = le64_to_cpu(ent->lde_hash);
46         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
49
50         /* Make sure the name is terminated with '0'.
51          * The data (type) after ent::lde_name maybe
52          * broken, but we do not care. */
53         ent->lde_name[ent->lde_namelen] = 0;
54 }
55
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
57 {
58         const struct dt_it_ops  *iops;
59         struct dt_it            *di;
60
61         spin_lock(&lfsck->li_lock);
62         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63         di = lfsck->li_di_oit;
64         lfsck->li_di_oit = NULL;
65         spin_unlock(&lfsck->li_lock);
66         iops->put(env, di);
67 }
68
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76         di = lfsck->li_di_dir;
77         lfsck->li_di_dir = NULL;
78         lfsck->li_cookie_dir = 0;
79         spin_unlock(&lfsck->li_lock);
80         iops->put(env, di);
81 }
82
83 static void lfsck_close_dir(const struct lu_env *env,
84                             struct lfsck_instance *lfsck)
85 {
86         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
87         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
88         struct dt_it            *dir_di   = lfsck->li_di_dir;
89
90         lfsck_di_dir_put(env, lfsck);
91         dir_iops->fini(env, dir_di);
92         lfsck->li_obj_dir = NULL;
93         lfsck_object_put(env, dir_obj);
94 }
95
96 static int lfsck_update_lma(const struct lu_env *env,
97                             struct lfsck_instance *lfsck, struct dt_object *obj)
98 {
99         struct lfsck_thread_info        *info   = lfsck_env_info(env);
100         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
101         struct dt_device                *dt     = lfsck->li_bottom;
102         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
103         struct lu_buf                   *buf;
104         struct thandle                  *th;
105         int                              fl;
106         int                              rc;
107         ENTRY;
108
109         if (bk->lb_param & LPF_DRYRUN)
110                 RETURN(0);
111
112         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
113         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
114         if (rc < 0) {
115                 if (rc != -ENODATA)
116                         RETURN(rc);
117
118                 fl = LU_XATTR_CREATE;
119                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
120         } else {
121                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
122                         RETURN(-EINVAL);
123
124                 fl = LU_XATTR_REPLACE;
125                 lustre_lma_swab(lma);
126                 lustre_lma_init(lma, lfsck_dto2fid(obj),
127                                 lma->lma_compat | LMAC_FID_ON_OST,
128                                 lma->lma_incompat);
129         }
130         lustre_lma_swab(lma);
131
132         th = dt_trans_create(env, dt);
133         if (IS_ERR(th))
134                 RETURN(PTR_ERR(th));
135
136         buf = lfsck_buf_get(env, lma, sizeof(*lma));
137         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
138         if (rc != 0)
139                 GOTO(stop, rc);
140
141         rc = dt_trans_start(env, dt, th);
142         if (rc != 0)
143                 GOTO(stop, rc);
144
145         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
146
147         GOTO(stop, rc);
148
149 stop:
150         dt_trans_stop(env, dt, th);
151         return rc;
152 }
153
154 static int lfsck_master_dir_engine(const struct lu_env *env,
155                                    struct lfsck_instance *lfsck)
156 {
157         struct lfsck_thread_info        *info   = lfsck_env_info(env);
158         const struct dt_it_ops          *iops   =
159                         &lfsck->li_obj_dir->do_index_ops->dio_it;
160         struct dt_it                    *di     = lfsck->li_di_dir;
161         struct lu_dirent                *ent    = &info->lti_ent;
162         struct lu_fid                   *fid    = &info->lti_fid;
163         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
164         struct ptlrpc_thread            *thread = &lfsck->li_thread;
165         int                              rc;
166         ENTRY;
167
168         do {
169                 struct dt_object *child;
170
171                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
172                     cfs_fail_val > 0) {
173                         struct l_wait_info lwi;
174
175                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
176                                           NULL, NULL);
177                         l_wait_event(thread->t_ctl_waitq,
178                                      !thread_is_running(thread),
179                                      &lwi);
180                 }
181
182                 lfsck->li_new_scanned++;
183                 rc = iops->rec(env, di, (struct dt_rec *)ent,
184                                lfsck->li_args_dir);
185                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
186                 if (rc != 0) {
187                         lfsck_fail(env, lfsck, true);
188                         if (bk->lb_param & LPF_FAILOUT)
189                                 RETURN(rc);
190                         else
191                                 goto checkpoint;
192                 }
193
194                 if (ent->lde_attrs & LUDA_IGNORE)
195                         goto checkpoint;
196
197                 *fid = ent->lde_fid;
198                 child = lfsck_object_find(env, lfsck, fid);
199                 if (child == NULL) {
200                         goto checkpoint;
201                 } else if (IS_ERR(child)) {
202                         lfsck_fail(env, lfsck, true);
203                         if (bk->lb_param & LPF_FAILOUT)
204                                 RETURN(PTR_ERR(child));
205                         else
206                                 goto checkpoint;
207                 }
208
209                 /* XXX: Currently, skip remote object, the consistency for
210                  *      remote object will be processed in LFSCK phase III. */
211                 if (dt_object_exists(child) && !dt_object_remote(child))
212                         rc = lfsck_exec_dir(env, lfsck, child, ent);
213                 lfsck_object_put(env, child);
214                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
215                         RETURN(rc);
216
217 checkpoint:
218                 rc = lfsck_checkpoint(env, lfsck);
219                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
220                         RETURN(rc);
221
222                 /* Rate control. */
223                 lfsck_control_speed(lfsck);
224                 if (unlikely(!thread_is_running(thread)))
225                         RETURN(0);
226
227                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
228                         spin_lock(&lfsck->li_lock);
229                         thread_set_flags(thread, SVC_STOPPING);
230                         spin_unlock(&lfsck->li_lock);
231                         RETURN(-EINVAL);
232                 }
233
234                 rc = iops->next(env, di);
235         } while (rc == 0);
236
237         if (rc > 0 && !lfsck->li_oit_over)
238                 lfsck_close_dir(env, lfsck);
239
240         RETURN(rc);
241 }
242
243 static int lfsck_master_oit_engine(const struct lu_env *env,
244                                    struct lfsck_instance *lfsck)
245 {
246         struct lfsck_thread_info        *info   = lfsck_env_info(env);
247         const struct dt_it_ops          *iops   =
248                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
249         struct dt_it                    *di     = lfsck->li_di_oit;
250         struct lu_fid                   *fid    = &info->lti_fid;
251         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
252         struct ptlrpc_thread            *thread = &lfsck->li_thread;
253         __u32                            idx    =
254                                 lfsck_dev_idx(lfsck->li_bottom);
255         int                              rc;
256         ENTRY;
257
258         do {
259                 struct dt_object *target;
260                 bool              update_lma = false;
261
262                 if (lfsck->li_di_dir != NULL) {
263                         rc = lfsck_master_dir_engine(env, lfsck);
264                         if (rc <= 0)
265                                 RETURN(rc);
266                 }
267
268                 if (unlikely(lfsck->li_oit_over))
269                         RETURN(1);
270
271                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
272                     cfs_fail_val > 0) {
273                         struct l_wait_info lwi;
274
275                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
276                                           NULL, NULL);
277                         l_wait_event(thread->t_ctl_waitq,
278                                      !thread_is_running(thread),
279                                      &lwi);
280                 }
281
282                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
283                         RETURN(0);
284
285                 lfsck->li_current_oit_processed = 1;
286                 lfsck->li_new_scanned++;
287                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
288                 if (rc != 0) {
289                         lfsck_fail(env, lfsck, true);
290                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
291                                 RETURN(rc);
292                         else
293                                 goto checkpoint;
294                 }
295
296                 if (fid_is_idif(fid)) {
297                         __u32 idx1 = fid_idif_ost_idx(fid);
298
299                         LASSERT(!lfsck->li_master);
300
301                         /* It is an old format device, update the LMA. */
302                         if (idx != idx1) {
303                                 struct ost_id *oi = &info->lti_oi;
304
305                                 fid_to_ostid(fid, oi);
306                                 ostid_to_fid(fid, oi, idx);
307                                 update_lma = true;
308                         }
309                 }
310
311                 target = lfsck_object_find(env, lfsck, fid);
312                 if (target == NULL) {
313                         goto checkpoint;
314                 } else if (IS_ERR(target)) {
315                         lfsck_fail(env, lfsck, true);
316                         if (bk->lb_param & LPF_FAILOUT)
317                                 RETURN(PTR_ERR(target));
318                         else
319                                 goto checkpoint;
320                 }
321
322                 /* XXX: Currently, skip remote object, the consistency for
323                  *      remote object will be processed in LFSCK phase III. */
324                 if (dt_object_exists(target) && !dt_object_remote(target)) {
325                         if (update_lma)
326                                 rc = lfsck_update_lma(env, lfsck, target);
327                         if (rc == 0)
328                                 rc = lfsck_exec_oit(env, lfsck, target);
329                 }
330                 lfsck_object_put(env, target);
331                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
332                         RETURN(rc);
333
334 checkpoint:
335                 rc = lfsck_checkpoint(env, lfsck);
336                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
337                         RETURN(rc);
338
339                 /* Rate control. */
340                 lfsck_control_speed(lfsck);
341
342                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
343                         spin_lock(&lfsck->li_lock);
344                         thread_set_flags(thread, SVC_STOPPING);
345                         spin_unlock(&lfsck->li_lock);
346                         RETURN(-EINVAL);
347                 }
348
349                 rc = iops->next(env, di);
350                 if (unlikely(rc > 0))
351                         lfsck->li_oit_over = 1;
352                 else if (likely(rc == 0))
353                         lfsck->li_current_oit_processed = 0;
354
355                 if (unlikely(!thread_is_running(thread)))
356                         RETURN(0);
357         } while (rc == 0 || lfsck->li_di_dir != NULL);
358
359         RETURN(rc);
360 }
361
362 int lfsck_master_engine(void *args)
363 {
364         struct lfsck_thread_args *lta      = args;
365         struct lu_env            *env      = &lta->lta_env;
366         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
367         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
368         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
369         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
370         struct dt_it             *oit_di;
371         int                       rc;
372         ENTRY;
373
374         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
375         if (IS_ERR(oit_di)) {
376                 rc = PTR_ERR(oit_di);
377                 CERROR("%s: LFSCK, fail to init iteration: rc = %d\n",
378                        lfsck_lfsck2name(lfsck), rc);
379
380                 GOTO(fini_args, rc);
381         }
382
383         spin_lock(&lfsck->li_lock);
384         lfsck->li_di_oit = oit_di;
385         spin_unlock(&lfsck->li_lock);
386         rc = lfsck_prep(env, lfsck);
387         if (rc != 0)
388                 GOTO(fini_oit, rc);
389
390         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
391                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
392                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
393                lfsck->li_pos_current.lp_oit_cookie,
394                lfsck->li_pos_current.lp_dir_cookie,
395                PFID(&lfsck->li_pos_current.lp_dir_parent),
396                current_pid());
397
398         spin_lock(&lfsck->li_lock);
399         thread_set_flags(thread, SVC_RUNNING);
400         spin_unlock(&lfsck->li_lock);
401         wake_up_all(&thread->t_ctl_waitq);
402
403         if (!cfs_list_empty(&lfsck->li_list_scan) ||
404             cfs_list_empty(&lfsck->li_list_double_scan))
405                 rc = lfsck_master_oit_engine(env, lfsck);
406         else
407                 rc = 1;
408
409         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
410                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
411                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
412                lfsck->li_pos_current.lp_oit_cookie,
413                lfsck->li_pos_current.lp_dir_cookie,
414                PFID(&lfsck->li_pos_current.lp_dir_parent),
415                current_pid(), rc);
416
417         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
418                 rc = lfsck_post(env, lfsck, rc);
419
420         if (lfsck->li_di_dir != NULL)
421                 lfsck_close_dir(env, lfsck);
422
423 fini_oit:
424         lfsck_di_oit_put(env, lfsck);
425         oit_iops->fini(env, oit_di);
426         if (rc == 1) {
427                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
428                         rc = lfsck_double_scan(env, lfsck);
429                 else
430                         rc = 0;
431         } else {
432                 lfsck_quit(env, lfsck);
433         }
434
435         /* XXX: Purge the pinned objects in the future. */
436
437 fini_args:
438         spin_lock(&lfsck->li_lock);
439         thread_set_flags(thread, SVC_STOPPED);
440         spin_unlock(&lfsck->li_lock);
441         wake_up_all(&thread->t_ctl_waitq);
442         lfsck_thread_args_fini(lta);
443         return rc;
444 }