Whamcloud - gitweb
LU-957 scrub: trigger OI scrub if found bad OI entry
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_orphans.c
37  *
38  * Orphan handling code
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  *         Pravin B Shelar <pravin.shelar@sun.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_ver.h>
49 #include <obd_support.h>
50 #include <lustre_fid.h>
51 #include "mdd_internal.h"
52
53 const char orph_index_name[] = "PENDING";
54 const char *dotdot = "..";
55
56 enum {
57         ORPH_OP_UNLINK,
58         ORPH_OP_TRUNCATE
59 };
60
61 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
62 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
63
64 static struct dt_key* orph_key_fill(const struct lu_env *env,
65                                     const struct lu_fid *lf, __u32 op)
66 {
67         char *key = mdd_env_info(env)->mti_orph_key;
68         int rc;
69
70         LASSERT(key);
71         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT,
72                       (long long unsigned int)fid_seq(lf),
73                       fid_oid(lf), fid_ver(lf), op);
74         if (rc > 0)
75                 return (struct dt_key*) key;
76         else
77                 return ERR_PTR(rc);
78 }
79
80 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
81                                        const struct lu_fid *lf)
82 {
83         char *key = mdd_env_info(env)->mti_orph_key;
84         int rc;
85
86         LASSERT(key);
87         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18,
88                       (unsigned long long)fid_seq(lf), fid_oid(lf));
89         if (rc > 0)
90                 return (struct dt_key*) key;
91         else
92                 return ERR_PTR(rc);
93 }
94
95 static inline void mdd_orphan_write_lock(const struct lu_env *env,
96                                     struct mdd_device *mdd)
97 {
98
99         struct dt_object        *dor    = mdd->mdd_orphans;
100         dor->do_ops->do_write_lock(env, dor, MOR_TGT_ORPHAN);
101 }
102
103 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
104                                            struct mdd_device *mdd)
105 {
106
107         struct dt_object        *dor    = mdd->mdd_orphans;
108         dor->do_ops->do_write_unlock(env, dor);
109 }
110
111 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
112                                         struct mdd_device *mdd,
113                                         struct mdd_object *obj,
114                                         __u32 op,
115                                         struct thandle *th)
116 {
117         struct dt_object        *dor    = mdd->mdd_orphans;
118         const struct lu_fid     *lf     = mdo2fid(obj);
119         struct dt_key           *key    = orph_key_fill(env, lf, op);
120         ENTRY;
121
122         return  dor->do_index_ops->dio_insert(env, dor,
123                                               (struct dt_rec *)lf,
124                                               key, th,
125                                               BYPASS_CAPA, 1);
126 }
127
128 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
129                                         struct mdd_device  *mdd ,
130                                         struct dt_key *key,
131                                         struct thandle *th)
132 {
133         struct dt_object        *dor    = mdd->mdd_orphans;
134
135         return  dor->do_index_ops->dio_delete(env, dor,
136                                               key, th,
137                                               BYPASS_CAPA);
138 }
139
140 static inline void mdd_orphan_ref_add(const struct lu_env *env,
141                                  struct mdd_device *mdd,
142                                  struct thandle *th)
143 {
144         struct dt_object        *dor    = mdd->mdd_orphans;
145         dor->do_ops->do_ref_add(env, dor, th);
146 }
147
148 static inline void mdd_orphan_ref_del(const struct lu_env *env,
149                                  struct mdd_device *mdd,
150                                  struct thandle *th)
151 {
152         struct dt_object        *dor    = mdd->mdd_orphans;
153         dor->do_ops->do_ref_del(env, dor, th);
154 }
155
156
157 int orph_declare_index_insert(const struct lu_env *env,
158                               struct mdd_object *obj,
159                               struct thandle *th)
160 {
161         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
162         int                rc;
163
164         rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th);
165         if (rc)
166                 return rc;
167
168         rc = mdo_declare_ref_add(env, obj, th);
169         if (rc)
170                 return rc;
171
172         if (!S_ISDIR(mdd_object_type(obj)))
173                 return 0;
174
175         rc = mdo_declare_ref_add(env, obj, th);
176         if (rc)
177                 return rc;
178
179         rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
180         if (rc)
181                 return rc;
182
183         rc = mdo_declare_index_delete(env, obj, dotdot, th);
184         if (rc)
185                 return rc;
186
187         rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th);
188
189         return rc;
190 }
191
192 static int orph_index_insert(const struct lu_env *env,
193                              struct mdd_object *obj,
194                              __u32 op,
195                              struct thandle *th)
196 {
197         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
198         struct dt_object        *dor    = mdd->mdd_orphans;
199         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
200         struct dt_object        *next   = mdd_object_child(obj);
201         int rc;
202         ENTRY;
203
204         LASSERT(mdd_write_locked(env, obj) != 0);
205         LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
206         LASSERT(obj->mod_count > 0);
207
208         mdd_orphan_write_lock(env, mdd);
209
210         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
211         if (rc)
212                 GOTO(out, rc);
213
214         mdo_ref_add(env, obj, th);
215         if (!S_ISDIR(mdd_object_type(obj)))
216                 goto out;
217
218         mdo_ref_add(env, obj, th);
219         mdd_orphan_ref_add(env, mdd, th);
220
221         /* try best to fixup directory, dont return errors
222          * from here */
223         if (!dt_try_as_dir(env, next))
224                 goto out;
225         next->do_index_ops->dio_delete(env, next,
226                                        (const struct dt_key *)dotdot,
227                                        th, BYPASS_CAPA);
228
229         next->do_index_ops->dio_insert(env, next,
230                                        (struct dt_rec *)lf_dor,
231                                        (const struct dt_key *)dotdot,
232                                        th, BYPASS_CAPA, 1);
233
234 out:
235         if (rc == 0)
236                 obj->mod_flags |= ORPHAN_OBJ;
237
238         mdd_orphan_write_unlock(env, mdd);
239
240         RETURN(rc);
241 }
242
243 /**
244  * Destroy OSD object on MDD and associated OST objects.
245  *
246  * \param obj orphan object
247  * \param mdd used for sending llog msg to osts
248  *
249  * \retval  0   success
250  * \retval -ve  error
251  */
252 static int orphan_object_kill(const struct lu_env *env,
253                               struct mdd_object *obj,
254                               struct mdd_device *mdd,
255                               struct thandle *th)
256 {
257         struct lu_attr *la = &mdd_env_info(env)->mti_la;
258         int rc = 0;
259         ENTRY;
260
261         /* No need to lock this object as its recovery phase, and
262          * no other thread can access it. But we need to lock it
263          * as its precondition for osd api we using. */
264
265         mdo_ref_del(env, obj, th);
266         if (S_ISDIR(mdd_object_type(obj))) {
267                 mdo_ref_del(env, obj, th);
268                 mdd_orphan_ref_del(env, mdd, th);
269         } else {
270                 /* regular file , cleanup linked ost objects */
271                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
272                 if (rc == 0)
273                         rc = mdd_lov_destroy(env, mdd, obj, la);
274         }
275         mdo_destroy(env, obj, th);
276         RETURN(rc);
277 }
278
279 int orph_declare_index_delete(const struct lu_env *env,
280                               struct mdd_object *obj,
281                               struct thandle *th)
282 {
283         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
284         int                rc;
285
286         rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th);
287         if (rc)
288                 return rc;
289
290         rc = mdo_declare_ref_del(env, obj, th);
291         if (rc)
292                 return rc;
293
294         if (S_ISDIR(mdd_object_type(obj))) {
295                 rc = mdo_declare_ref_del(env, obj, th);
296                 if (rc)
297                         return rc;
298
299                 rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
300         }
301
302         return rc;
303 }
304
305 static int orph_index_delete(const struct lu_env *env,
306                              struct mdd_object *obj,
307                              __u32 op,
308                              struct thandle *th)
309 {
310         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
311         struct dt_object *dor = mdd->mdd_orphans;
312         struct dt_key *key;
313         int rc;
314
315         ENTRY;
316
317         LASSERT(mdd_write_locked(env, obj) != 0);
318         LASSERT(obj->mod_flags & ORPHAN_OBJ);
319         LASSERT(obj->mod_count == 0);
320
321         LASSERT(dor);
322
323         key = orph_key_fill(env, mdo2fid(obj), op);
324         mdd_orphan_write_lock(env, mdd);
325
326         rc = mdd_orphan_delete_obj(env, mdd, key, th);
327
328         if (rc == -ENOENT) {
329                 key = orph_key_fill_18(env, mdo2fid(obj));
330                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
331         }
332
333         if (!rc) {
334                 /* lov objects will be destroyed by caller */
335                 mdo_ref_del(env, obj, th);
336                 if (S_ISDIR(mdd_object_type(obj))) {
337                         mdo_ref_del(env, obj, th);
338                         mdd_orphan_ref_del(env, mdd, th);
339                 }
340                 obj->mod_flags &= ~ORPHAN_OBJ;
341         } else {
342                 CERROR("could not delete object: rc = %d\n",rc);
343         }
344
345         mdd_orphan_write_unlock(env, mdd);
346         RETURN(rc);
347 }
348
349
350 static int orphan_object_destroy(const struct lu_env *env,
351                                  struct mdd_object *obj,
352                                  struct dt_key *key)
353 {
354         struct thandle *th = NULL;
355         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
356         struct md_attr *ma = &mdd_env_info(env)->mti_ma;
357         int rc = 0;
358         ENTRY;
359
360         /* init ma */
361         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
362         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
363         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
364         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
365         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
366         ma->ma_valid = 0;
367
368         th = mdd_trans_create(env, mdd);
369         if (IS_ERR(th)) {
370                 CERROR("Cannot get thandle\n");
371                 RETURN(-ENOMEM);
372         }
373         rc = orph_declare_index_delete(env, obj, th);
374         if (rc)
375                 GOTO(stop, rc);
376
377         rc = mdd_declare_object_kill(env, obj, ma, th);
378         if (rc)
379                 GOTO(stop, rc);
380
381         rc = mdd_trans_start(env, mdd, th);
382         if (rc)
383                 GOTO(stop, rc);
384
385         mdd_write_lock(env, obj, MOR_TGT_CHILD);
386         if (likely(obj->mod_count == 0)) {
387                 mdd_orphan_write_lock(env, mdd);
388                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
389                 if (rc == 0)
390                         orphan_object_kill(env, obj, mdd, th);
391                 else
392                         CERROR("could not delete object: rc = %d\n",rc);
393                 mdd_orphan_write_unlock(env, mdd);
394         }
395         mdd_write_unlock(env, obj);
396
397 stop:
398         mdd_trans_stop(env, mdd, 0, th);
399
400         RETURN(rc);
401 }
402
403 /**
404  * Delete unused orphan with FID \a lf from PENDING directory
405  *
406  * \param mdd  MDD device finishing recovery
407  * \param lf   FID of file or directory to delete
408  * \param key  cookie for this entry in index iterator
409  *
410  * \retval 0   success
411  * \retval -ve error
412  */
413 static int orph_key_test_and_del(const struct lu_env *env,
414                                  struct mdd_device *mdd,
415                                  struct lu_fid *lf,
416                                  struct dt_key *key)
417 {
418         struct mdd_object *mdo;
419         int rc;
420
421         mdo = mdd_object_find(env, mdd, lf);
422
423         if (IS_ERR(mdo))
424                 return PTR_ERR(mdo);
425
426         rc = -EBUSY;
427         if (mdo->mod_count == 0) {
428                 CDEBUG(D_HA, "Found orphan "DFID", delete it\n", PFID(lf));
429                 rc = orphan_object_destroy(env, mdo, key);
430                 if (rc) /* so replay-single.sh test_37 works */
431                         CERROR("%s: error unlinking orphan "DFID" from "
432                                "PENDING: rc = %d\n",
433                                mdd->mdd_obd_dev->obd_name, PFID(lf), rc);
434         } else {
435                 mdd_write_lock(env, mdo, MOR_TGT_CHILD);
436                 if (likely(mdo->mod_count > 0)) {
437                         CDEBUG(D_HA, "Found orphan "DFID" count %d, skip it\n",
438                                PFID(lf), mdo->mod_count);
439                         mdo->mod_flags |= ORPHAN_OBJ;
440                 }
441                 mdd_write_unlock(env, mdo);
442         }
443
444         mdd_object_put(env, mdo);
445         return rc;
446 }
447
448 /**
449  * delete unreferenced files and directories in the PENDING directory
450  *
451  * Files that remain in PENDING after client->MDS recovery has completed
452  * have to be referenced (opened) by some client during recovery, or they
453  * will be deleted here (for clients that did not complete recovery).
454  *
455  * \param mdd  MDD device finishing recovery
456  *
457  * \retval 0   success
458  * \retval -ve error
459  */
460 static int orph_index_iterate(const struct lu_env *env,
461                               struct mdd_device *mdd)
462 {
463         struct dt_object *dor = mdd->mdd_orphans;
464         struct lu_dirent *ent = &mdd_env_info(env)->mti_orph_ent;
465         const struct dt_it_ops *iops;
466         struct dt_it     *it;
467         struct lu_fid     fid;
468         int               key_sz = 0;
469         int               rc;
470         __u64             cookie;
471         ENTRY;
472
473         /* In recovery phase, do not need for any lock here */
474         iops = &dor->do_index_ops->dio_it;
475         it = iops->init(env, dor, LUDA_64BITHASH, BYPASS_CAPA);
476         if (IS_ERR(it)) {
477                 rc = PTR_ERR(it);
478                 CERROR("%s: cannot clean PENDING: rc = %d\n",
479                        mdd->mdd_obd_dev->obd_name, rc);
480                 GOTO(out, rc);
481         }
482
483         rc = iops->load(env, it, 0);
484         if (rc < 0)
485                 GOTO(out_put, rc);
486         if (rc == 0) {
487                 CERROR("%s: error loading iterator to clean PENDING\n",
488                        mdd->mdd_obd_dev->obd_name);
489                 /* Index contains no zero key? */
490                 GOTO(out_put, rc = -EIO);
491         }
492
493         do {
494                 key_sz = iops->key_size(env, it);
495                 /* filter out "." and ".." entries from PENDING dir. */
496                 if (key_sz < 8)
497                         goto next;
498
499                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
500                 if (rc != 0) {
501                         CERROR("%s: fail to get FID for orphan it: rc = %d\n",
502                                mdd->mdd_obd_dev->obd_name, rc);
503                         goto next;
504                 }
505
506                 fid_le_to_cpu(&fid, &ent->lde_fid);
507                 if (!fid_is_sane(&fid)) {
508                         CERROR("%s: bad FID "DFID" cleaning PENDING\n",
509                                mdd->mdd_obd_dev->obd_name, PFID(&fid));
510                         goto next;
511                 }
512
513                 /* kill orphan object */
514                 cookie = iops->store(env, it);
515                 iops->put(env, it);
516                 rc = orph_key_test_and_del(env, mdd, &fid,
517                                            (struct dt_key *)ent->lde_name);
518
519                 /* after index delete reset iterator */
520                 if (rc == 0)
521                         rc = iops->get(env, it, (const void *)"");
522                 else
523                         rc = iops->load(env, it, cookie);
524 next:
525                 rc = iops->next(env, it);
526         } while (rc == 0);
527
528         GOTO(out_put, rc = 0);
529 out_put:
530         iops->put(env, it);
531         iops->fini(env, it);
532
533 out:
534         return rc;
535 }
536
537 /**
538  * open the PENDING directory for device \a mdd
539  *
540  * The PENDING directory persistently tracks files and directories that were
541  * unlinked from the namespace (nlink == 0) but are still held open by clients.
542  * Those inodes shouldn't be deleted if the MDS crashes, because the clients
543  * would not be able to recover and reopen those files.  Instead, these inodes
544  * are linked into the PENDING directory on disk, and only deleted if all
545  * clients close them, or the MDS finishes client recovery without any client
546  * reopening them (i.e. former clients didn't join recovery).
547  *  \param d   mdd device being started.
548  *
549  *  \retval 0  success
550  *  \retval  -ve index operation error.
551  *
552  */
553 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
554 {
555         struct lu_fid fid;
556         struct dt_object *d;
557         int rc = 0;
558         ENTRY;
559
560         d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
561         if (!IS_ERR(d)) {
562                 mdd->mdd_orphans = d;
563                 if (!dt_try_as_dir(env, d)) {
564                         rc = -ENOTDIR;
565                         CERROR("\"%s\" is not an index! : rc = %d\n",
566                                         orph_index_name, rc);
567                 }
568         } else {
569                 CERROR("cannot find \"%s\" obj %d\n",
570                        orph_index_name, (int)PTR_ERR(d));
571                 rc = PTR_ERR(d);
572         }
573
574         RETURN(rc);
575 }
576
577 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
578 {
579         ENTRY;
580         if (mdd->mdd_orphans != NULL) {
581                 lu_object_put(env, &mdd->mdd_orphans->do_lu);
582                 mdd->mdd_orphans = NULL;
583         }
584         EXIT;
585 }
586
587 /**
588  *  Iterate orphan index to cleanup orphan objects after recovery is done.
589  *  \param d   mdd device in recovery.
590  */
591 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
592 {
593         return orph_index_iterate(env, d);
594 }
595
596 /**
597  *  add an orphan \a obj to the orphan index.
598  *  \param obj file or directory.
599  *  \param th  transaction for index insert.
600  *
601  *  \pre obj nlink == 0 && obj->mod_count != 0
602  *
603  *  \retval 0  success
604  *  \retval  -ve index operation error.
605  */
606 int __mdd_orphan_add(const struct lu_env *env,
607                      struct mdd_object *obj, struct thandle *th)
608 {
609         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
610 }
611
612 /**
613  *  delete an orphan \a obj from orphan index.
614  *  \param obj file or directory.
615  *  \param th  transaction for index deletion and object destruction.
616  *
617  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
618  *
619  *  \retval 0  success
620  *  \retval  -ve index operation error.
621  */
622 int __mdd_orphan_del(const struct lu_env *env,
623                      struct mdd_object *obj, struct thandle *th)
624 {
625         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
626 }