Whamcloud - gitweb
LU-2914 lfsck: split LFSCK code from mdd to lfsck
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <lustre_net.h>
39 #include <lustre_fid.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42
43 #include "lfsck_internal.h"
44
45 static void lfsck_unpack_ent(struct lu_dirent *ent)
46 {
47         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
48         ent->lde_hash = le64_to_cpu(ent->lde_hash);
49         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
50         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
51         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
52
53         /* Make sure the name is terminated with '0'.
54          * The data (type) after ent::lde_name maybe
55          * broken, but we do not care. */
56         ent->lde_name[ent->lde_namelen] = 0;
57 }
58
59 static void lfsck_close_dir(const struct lu_env *env,
60                             struct lfsck_instance *lfsck)
61 {
62         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
63         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
64         struct dt_it            *dir_di   = lfsck->li_di_dir;
65
66         spin_lock(&lfsck->li_lock);
67         lfsck->li_di_dir = NULL;
68         spin_unlock(&lfsck->li_lock);
69
70         dir_iops->put(env, dir_di);
71         dir_iops->fini(env, dir_di);
72         lfsck->li_obj_dir = NULL;
73         lfsck_object_put(env, dir_obj);
74 }
75
76 static int lfsck_master_dir_engine(const struct lu_env *env,
77                                    struct lfsck_instance *lfsck)
78 {
79         struct lfsck_thread_info        *info   = lfsck_env_info(env);
80         const struct dt_it_ops          *iops   =
81                         &lfsck->li_obj_dir->do_index_ops->dio_it;
82         struct dt_it                    *di     = lfsck->li_di_dir;
83         struct lu_dirent                *ent    = &info->lti_ent;
84         struct lu_fid                   *fid    = &info->lti_fid;
85         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
86         struct ptlrpc_thread            *thread = &lfsck->li_thread;
87         int                              rc;
88         ENTRY;
89
90         do {
91                 struct dt_object *child;
92
93                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
94                     cfs_fail_val > 0) {
95                         struct l_wait_info lwi;
96
97                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
98                                           NULL, NULL);
99                         l_wait_event(thread->t_ctl_waitq,
100                                      !thread_is_running(thread),
101                                      &lwi);
102                 }
103
104                 lfsck->li_new_scanned++;
105                 rc = iops->rec(env, di, (struct dt_rec *)ent,
106                                lfsck->li_args_dir);
107                 if (rc != 0) {
108                         lfsck_fail(env, lfsck, true);
109                         if (bk->lb_param & LPF_FAILOUT)
110                                 RETURN(rc);
111                         else
112                                 goto checkpoint;
113                 }
114
115                 lfsck_unpack_ent(ent);
116                 if (ent->lde_attrs & LUDA_IGNORE)
117                         goto checkpoint;
118
119                 *fid = ent->lde_fid;
120                 child = lfsck_object_find(env, lfsck, fid);
121                 if (child == NULL) {
122                         goto checkpoint;
123                 } else if (IS_ERR(child)) {
124                         lfsck_fail(env, lfsck, true);
125                         if (bk->lb_param & LPF_FAILOUT)
126                                 RETURN(PTR_ERR(child));
127                         else
128                                 goto checkpoint;
129                 }
130
131                 /* XXX: Currently, skip remote object, the consistency for
132                  *      remote object will be processed in LFSCK phase III. */
133                 if (dt_object_exists(child) && !dt_object_remote(child))
134                         rc = lfsck_exec_dir(env, lfsck, child, ent);
135                 lfsck_object_put(env, child);
136                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
137                         RETURN(rc);
138
139 checkpoint:
140                 rc = lfsck_checkpoint(env, lfsck);
141                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
142                         RETURN(rc);
143
144                 /* Rate control. */
145                 lfsck_control_speed(lfsck);
146                 if (unlikely(!thread_is_running(thread)))
147                         RETURN(0);
148
149                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
150                         spin_lock(&lfsck->li_lock);
151                         thread_set_flags(thread, SVC_STOPPING);
152                         spin_unlock(&lfsck->li_lock);
153                         RETURN(-EINVAL);
154                 }
155
156                 rc = iops->next(env, di);
157         } while (rc == 0);
158
159         if (rc > 0 && !lfsck->li_oit_over)
160                 lfsck_close_dir(env, lfsck);
161
162         RETURN(rc);
163 }
164
165 static int lfsck_master_oit_engine(const struct lu_env *env,
166                                    struct lfsck_instance *lfsck)
167 {
168         struct lfsck_thread_info        *info   = lfsck_env_info(env);
169         const struct dt_it_ops          *iops   =
170                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
171         struct dt_it                    *di     = lfsck->li_di_oit;
172         struct lu_fid                   *fid    = &info->lti_fid;
173         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
174         struct ptlrpc_thread            *thread = &lfsck->li_thread;
175         int                              rc;
176         ENTRY;
177
178         do {
179                 struct dt_object *target;
180
181                 if (lfsck->li_di_dir != NULL) {
182                         rc = lfsck_master_dir_engine(env, lfsck);
183                         if (rc <= 0)
184                                 RETURN(rc);
185                 }
186
187                 if (unlikely(lfsck->li_oit_over))
188                         RETURN(1);
189
190                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
191                     cfs_fail_val > 0) {
192                         struct l_wait_info lwi;
193
194                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
195                                           NULL, NULL);
196                         l_wait_event(thread->t_ctl_waitq,
197                                      !thread_is_running(thread),
198                                      &lwi);
199                 }
200
201                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
202                         RETURN(0);
203
204                 lfsck->li_current_oit_processed = 1;
205                 lfsck->li_new_scanned++;
206                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
207                 if (rc != 0) {
208                         lfsck_fail(env, lfsck, true);
209                         if (bk->lb_param & LPF_FAILOUT)
210                                 RETURN(rc);
211                         else
212                                 goto checkpoint;
213                 }
214
215                 target = lfsck_object_find(env, lfsck, fid);
216                 if (target == NULL) {
217                         goto checkpoint;
218                 } else if (IS_ERR(target)) {
219                         lfsck_fail(env, lfsck, true);
220                         if (bk->lb_param & LPF_FAILOUT)
221                                 RETURN(PTR_ERR(target));
222                         else
223                                 goto checkpoint;
224                 }
225
226                 /* XXX: Currently, skip remote object, the consistency for
227                  *      remote object will be processed in LFSCK phase III. */
228                 if (dt_object_exists(target) && !dt_object_remote(target))
229                         rc = lfsck_exec_oit(env, lfsck, target);
230                 lfsck_object_put(env, target);
231                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
232                         RETURN(rc);
233
234 checkpoint:
235                 rc = lfsck_checkpoint(env, lfsck);
236                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
237                         RETURN(rc);
238
239                 /* Rate control. */
240                 lfsck_control_speed(lfsck);
241
242                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
243                         spin_lock(&lfsck->li_lock);
244                         thread_set_flags(thread, SVC_STOPPING);
245                         spin_unlock(&lfsck->li_lock);
246                         RETURN(-EINVAL);
247                 }
248
249                 rc = iops->next(env, di);
250                 if (unlikely(rc > 0))
251                         lfsck->li_oit_over = 1;
252                 else if (likely(rc == 0))
253                         lfsck->li_current_oit_processed = 0;
254
255                 if (unlikely(!thread_is_running(thread)))
256                         RETURN(0);
257         } while (rc == 0 || lfsck->li_di_dir != NULL);
258
259         RETURN(rc);
260 }
261
262 int lfsck_master_engine(void *args)
263 {
264         struct lu_env            env;
265         struct lfsck_instance   *lfsck    = (struct lfsck_instance *)args;
266         struct ptlrpc_thread    *thread   = &lfsck->li_thread;
267         struct dt_object        *oit_obj  = lfsck->li_obj_oit;
268         const struct dt_it_ops  *oit_iops = &oit_obj->do_index_ops->dio_it;
269         struct dt_it            *oit_di;
270         int                      rc;
271         ENTRY;
272
273         cfs_daemonize("lfsck_master");
274         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
275         if (rc != 0) {
276                 CERROR("%s: LFSCK, fail to init env, rc = %d\n",
277                        lfsck_lfsck2name(lfsck), rc);
278                 GOTO(noenv, rc);
279         }
280
281         oit_di = oit_iops->init(&env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
282         if (IS_ERR(oit_di)) {
283                 rc = PTR_ERR(oit_di);
284                 CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
285                        lfsck_lfsck2name(lfsck), rc);
286                 GOTO(fini_env, rc);
287         }
288
289         spin_lock(&lfsck->li_lock);
290         lfsck->li_di_oit = oit_di;
291         spin_unlock(&lfsck->li_lock);
292         rc = lfsck_prep(&env, lfsck);
293         if (rc != 0)
294                 GOTO(fini_oit, rc);
295
296         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
297                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
298                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
299                lfsck->li_pos_current.lp_oit_cookie,
300                lfsck->li_pos_current.lp_dir_cookie,
301                PFID(&lfsck->li_pos_current.lp_dir_parent),
302                cfs_curproc_pid());
303
304         spin_lock(&lfsck->li_lock);
305         thread_set_flags(thread, SVC_RUNNING);
306         spin_unlock(&lfsck->li_lock);
307         cfs_waitq_broadcast(&thread->t_ctl_waitq);
308
309         if (!cfs_list_empty(&lfsck->li_list_scan) ||
310             cfs_list_empty(&lfsck->li_list_double_scan))
311                 rc = lfsck_master_oit_engine(&env, lfsck);
312         else
313                 rc = 1;
314
315         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
316                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
317                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
318                lfsck->li_pos_current.lp_oit_cookie,
319                lfsck->li_pos_current.lp_dir_cookie,
320                PFID(&lfsck->li_pos_current.lp_dir_parent),
321                cfs_curproc_pid(), rc);
322
323         if (lfsck->li_paused && cfs_list_empty(&lfsck->li_list_scan))
324                 oit_iops->put(&env, oit_di);
325
326         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
327                 rc = lfsck_post(&env, lfsck, rc);
328         if (lfsck->li_di_dir != NULL)
329                 lfsck_close_dir(&env, lfsck);
330
331 fini_oit:
332         spin_lock(&lfsck->li_lock);
333         lfsck->li_di_oit = NULL;
334         spin_unlock(&lfsck->li_lock);
335
336         oit_iops->fini(&env, oit_di);
337         if (rc == 1) {
338                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
339                         rc = lfsck_double_scan(&env, lfsck);
340                 else
341                         rc = 0;
342         }
343
344         /* XXX: Purge the pinned objects in the future. */
345
346 fini_env:
347         lu_env_fini(&env);
348
349 noenv:
350         spin_lock(&lfsck->li_lock);
351         thread_set_flags(thread, SVC_STOPPED);
352         cfs_waitq_broadcast(&thread->t_ctl_waitq);
353         spin_unlock(&lfsck->li_lock);
354         return rc;
355 }