Whamcloud - gitweb
963493d2dee36fbb77b22c20b790cc42d4afafad
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_orphans.c
37  *
38  * Orphan handling code
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  *         Pravin B Shelar <pravin.shelar@sun.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <obd.h>
47 #include <obd_class.h>
48 #include <lustre_ver.h>
49 #include <obd_support.h>
50 #include <lustre_fid.h>
51 #include "mdd_internal.h"
52
53 const char orph_index_name[] = "PENDING";
54 const char *dotdot = "..";
55
56 enum {
57         ORPH_OP_UNLINK,
58         ORPH_OP_TRUNCATE
59 };
60
61 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
62 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
63
64 static struct dt_key* orph_key_fill(const struct lu_env *env,
65                                     const struct lu_fid *lf, __u32 op)
66 {
67         char *key = mdd_env_info(env)->mti_orph_key;
68         int rc;
69
70         LASSERT(key);
71         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT,
72                       (long long unsigned int)fid_seq(lf),
73                       fid_oid(lf), fid_ver(lf), op);
74         if (rc > 0)
75                 return (struct dt_key*) key;
76         else
77                 return ERR_PTR(rc);
78 }
79
80 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
81                                        const struct lu_fid *lf)
82 {
83         char *key = mdd_env_info(env)->mti_orph_key;
84         int rc;
85
86         LASSERT(key);
87         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18,
88                       (unsigned long long)fid_seq(lf), fid_oid(lf));
89         if (rc > 0)
90                 return (struct dt_key*) key;
91         else
92                 return ERR_PTR(rc);
93 }
94
95 static int orphan_key_to_fid(char *key, struct lu_fid *lf)
96 {
97         int rc = 0;
98         unsigned int op;
99
100         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT,
101                     (long long unsigned int *)&lf->f_seq, &lf->f_oid,
102                     &lf->f_ver, &op);
103         if (rc == 4)
104                 return 0;
105
106         /* build igif */
107         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
108                     (long long unsigned int *)&lf->f_seq, &lf->f_oid);
109         if (rc == 2) {
110                 lf->f_ver = 0;
111                 return 0;
112         }
113
114         CERROR("can not parse orphan file name %s\n", key);
115         return -EINVAL;
116 }
117
118 static inline void mdd_orphan_write_lock(const struct lu_env *env,
119                                     struct mdd_device *mdd)
120 {
121
122         struct dt_object        *dor    = mdd->mdd_orphans;
123         dor->do_ops->do_write_lock(env, dor, MOR_TGT_ORPHAN);
124 }
125
126 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
127                                            struct mdd_device *mdd)
128 {
129
130         struct dt_object        *dor    = mdd->mdd_orphans;
131         dor->do_ops->do_write_unlock(env, dor);
132 }
133
134 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
135                                         struct mdd_device *mdd,
136                                         struct mdd_object *obj,
137                                         __u32 op,
138                                         struct thandle *th)
139 {
140         struct dt_object        *dor    = mdd->mdd_orphans;
141         const struct lu_fid     *lf     = mdo2fid(obj);
142         struct dt_key           *key    = orph_key_fill(env, lf, op);
143         ENTRY;
144
145         return  dor->do_index_ops->dio_insert(env, dor,
146                                               (struct dt_rec *)lf,
147                                               key, th,
148                                               BYPASS_CAPA, 1);
149 }
150
151 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
152                                         struct mdd_device  *mdd ,
153                                         struct dt_key *key,
154                                         struct thandle *th)
155 {
156         struct dt_object        *dor    = mdd->mdd_orphans;
157
158         return  dor->do_index_ops->dio_delete(env, dor,
159                                               key, th,
160                                               BYPASS_CAPA);
161 }
162
163 static inline void mdd_orphan_ref_add(const struct lu_env *env,
164                                  struct mdd_device *mdd,
165                                  struct thandle *th)
166 {
167         struct dt_object        *dor    = mdd->mdd_orphans;
168         dor->do_ops->do_ref_add(env, dor, th);
169 }
170
171 static inline void mdd_orphan_ref_del(const struct lu_env *env,
172                                  struct mdd_device *mdd,
173                                  struct thandle *th)
174 {
175         struct dt_object        *dor    = mdd->mdd_orphans;
176         dor->do_ops->do_ref_del(env, dor, th);
177 }
178
179
180 int orph_declare_index_insert(const struct lu_env *env,
181                               struct mdd_object *obj,
182                               struct thandle *th)
183 {
184         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
185         int                rc;
186
187         rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th);
188         if (rc)
189                 return rc;
190
191         rc = mdo_declare_ref_add(env, obj, th);
192         if (rc)
193                 return rc;
194
195         if (!S_ISDIR(mdd_object_type(obj)))
196                 return 0;
197
198         rc = mdo_declare_ref_add(env, obj, th);
199         if (rc)
200                 return rc;
201
202         rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
203         if (rc)
204                 return rc;
205
206         rc = mdo_declare_index_delete(env, obj, dotdot, th);
207         if (rc)
208                 return rc;
209
210         rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th);
211
212         return rc;
213 }
214
215 static int orph_index_insert(const struct lu_env *env,
216                              struct mdd_object *obj,
217                              __u32 op,
218                              struct thandle *th)
219 {
220         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
221         struct dt_object        *dor    = mdd->mdd_orphans;
222         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
223         struct dt_object        *next   = mdd_object_child(obj);
224         int rc;
225         ENTRY;
226
227         LASSERT(mdd_write_locked(env, obj) != 0);
228         LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
229         LASSERT(obj->mod_count > 0);
230
231         mdd_orphan_write_lock(env, mdd);
232
233         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
234         if (rc)
235                 GOTO(out, rc);
236
237         mdo_ref_add(env, obj, th);
238         if (!S_ISDIR(mdd_object_type(obj)))
239                 goto out;
240
241         mdo_ref_add(env, obj, th);
242         mdd_orphan_ref_add(env, mdd, th);
243
244         /* try best to fixup directory, dont return errors
245          * from here */
246         if (!dt_try_as_dir(env, next))
247                 goto out;
248         next->do_index_ops->dio_delete(env, next,
249                                        (const struct dt_key *)dotdot,
250                                        th, BYPASS_CAPA);
251
252         next->do_index_ops->dio_insert(env, next,
253                                        (struct dt_rec *)lf_dor,
254                                        (const struct dt_key *)dotdot,
255                                        th, BYPASS_CAPA, 1);
256
257 out:
258         if (rc == 0)
259                 obj->mod_flags |= ORPHAN_OBJ;
260
261         mdd_orphan_write_unlock(env, mdd);
262
263         RETURN(rc);
264 }
265
266 /**
267  * Destroy OSD object on MDD and associated OST objects.
268  *
269  * \param obj orphan object
270  * \param mdd used for sending llog msg to osts
271  *
272  * \retval  0   success
273  * \retval -ve  error
274  */
275 static int orphan_object_kill(const struct lu_env *env,
276                               struct mdd_object *obj,
277                               struct mdd_device *mdd,
278                               struct thandle *th)
279 {
280         struct lu_attr *la = &mdd_env_info(env)->mti_la;
281         int rc = 0;
282         ENTRY;
283
284         /* No need to lock this object as its recovery phase, and
285          * no other thread can access it. But we need to lock it
286          * as its precondition for osd api we using. */
287
288         mdo_ref_del(env, obj, th);
289         if (S_ISDIR(mdd_object_type(obj))) {
290                 mdo_ref_del(env, obj, th);
291                 mdd_orphan_ref_del(env, mdd, th);
292         } else {
293                 /* regular file , cleanup linked ost objects */
294                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
295                 if (rc == 0)
296                         rc = mdd_lov_destroy(env, mdd, obj, la);
297         }
298         mdo_destroy(env, obj, th);
299         RETURN(rc);
300 }
301
302 int orph_declare_index_delete(const struct lu_env *env,
303                               struct mdd_object *obj,
304                               struct thandle *th)
305 {
306         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
307         int                rc;
308
309         rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th);
310         if (rc)
311                 return rc;
312
313         rc = mdo_declare_ref_del(env, obj, th);
314         if (rc)
315                 return rc;
316
317         if (S_ISDIR(mdd_object_type(obj))) {
318                 rc = mdo_declare_ref_del(env, obj, th);
319                 if (rc)
320                         return rc;
321
322                 rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
323         }
324
325         return rc;
326 }
327
328 static int orph_index_delete(const struct lu_env *env,
329                              struct mdd_object *obj,
330                              __u32 op,
331                              struct thandle *th)
332 {
333         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
334         struct dt_object *dor = mdd->mdd_orphans;
335         struct dt_key *key;
336         int rc;
337
338         ENTRY;
339
340         LASSERT(mdd_write_locked(env, obj) != 0);
341         LASSERT(obj->mod_flags & ORPHAN_OBJ);
342         LASSERT(obj->mod_count == 0);
343
344         LASSERT(dor);
345
346         key = orph_key_fill(env, mdo2fid(obj), op);
347         mdd_orphan_write_lock(env, mdd);
348
349         rc = mdd_orphan_delete_obj(env, mdd, key, th);
350
351         if (rc == -ENOENT) {
352                 key = orph_key_fill_18(env, mdo2fid(obj));
353                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
354         }
355
356         if (!rc) {
357                 /* lov objects will be destroyed by caller */
358                 mdo_ref_del(env, obj, th);
359                 if (S_ISDIR(mdd_object_type(obj))) {
360                         mdo_ref_del(env, obj, th);
361                         mdd_orphan_ref_del(env, mdd, th);
362                 }
363                 obj->mod_flags &= ~ORPHAN_OBJ;
364         } else {
365                 CERROR("could not delete object: rc = %d\n",rc);
366         }
367
368         mdd_orphan_write_unlock(env, mdd);
369         RETURN(rc);
370 }
371
372
373 static int orphan_object_destroy(const struct lu_env *env,
374                                  struct mdd_object *obj,
375                                  struct dt_key *key)
376 {
377         struct thandle *th = NULL;
378         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
379         struct md_attr *ma = &mdd_env_info(env)->mti_ma;
380         int rc = 0;
381         ENTRY;
382
383         /* init ma */
384         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
385         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
386         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
387         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
388         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
389         ma->ma_valid = 0;
390
391         th = mdd_trans_create(env, mdd);
392         if (IS_ERR(th)) {
393                 CERROR("Cannot get thandle\n");
394                 RETURN(-ENOMEM);
395         }
396         rc = orph_declare_index_delete(env, obj, th);
397         if (rc)
398                 GOTO(stop, rc);
399
400         rc = mdd_declare_object_kill(env, obj, ma, th);
401         if (rc)
402                 GOTO(stop, rc);
403
404         rc = mdd_trans_start(env, mdd, th);
405         if (rc)
406                 GOTO(stop, rc);
407
408         mdd_write_lock(env, obj, MOR_TGT_CHILD);
409         if (likely(obj->mod_count == 0)) {
410                 mdd_orphan_write_lock(env, mdd);
411                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
412                 if (rc == 0)
413                         orphan_object_kill(env, obj, mdd, th);
414                 else
415                         CERROR("could not delete object: rc = %d\n",rc);
416                 mdd_orphan_write_unlock(env, mdd);
417         }
418         mdd_write_unlock(env, obj);
419
420 stop:
421         mdd_trans_stop(env, mdd, 0, th);
422
423         RETURN(rc);
424 }
425
426 /**
427  * Delete unused orphan with FID \a lf from PENDING directory
428  *
429  * \param mdd  MDD device finishing recovery
430  * \param lf   FID of file or directory to delete
431  * \param key  cookie for this entry in index iterator
432  *
433  * \retval 0   success
434  * \retval -ve error
435  */
436 static int orph_key_test_and_del(const struct lu_env *env,
437                                  struct mdd_device *mdd,
438                                  struct lu_fid *lf,
439                                  struct dt_key *key)
440 {
441         struct mdd_object *mdo;
442         int rc;
443
444         mdo = mdd_object_find(env, mdd, lf);
445
446         if (IS_ERR(mdo))
447                 return PTR_ERR(mdo);
448
449         rc = -EBUSY;
450         if (mdo->mod_count == 0) {
451                 CDEBUG(D_HA, "Found orphan "DFID", delete it\n", PFID(lf));
452                 rc = orphan_object_destroy(env, mdo, key);
453                 if (rc) /* so replay-single.sh test_37 works */
454                         CERROR("%s: error unlinking orphan "DFID" from "
455                                "PENDING: rc = %d\n",
456                                mdd->mdd_obd_dev->obd_name, PFID(lf), rc);
457         } else {
458                 mdd_write_lock(env, mdo, MOR_TGT_CHILD);
459                 if (likely(mdo->mod_count > 0)) {
460                         CDEBUG(D_HA, "Found orphan "DFID" count %d, skip it\n",
461                                PFID(lf), mdo->mod_count);
462                         mdo->mod_flags |= ORPHAN_OBJ;
463                 }
464                 mdd_write_unlock(env, mdo);
465         }
466
467         mdd_object_put(env, mdo);
468         return rc;
469 }
470
471 /**
472  * delete unreferenced files and directories in the PENDING directory
473  *
474  * Files that remain in PENDING after client->MDS recovery has completed
475  * have to be referenced (opened) by some client during recovery, or they
476  * will be deleted here (for clients that did not complete recovery).
477  *
478  * \param mdd  MDD device finishing recovery
479  *
480  * \retval 0   success
481  * \retval -ve error
482  */
483 static int orph_index_iterate(const struct lu_env *env,
484                               struct mdd_device *mdd)
485 {
486         struct dt_object *dor = mdd->mdd_orphans;
487         char             *mti_key = mdd_env_info(env)->mti_orph_key;
488         const struct dt_it_ops *iops;
489         struct dt_it     *it;
490         char             *key;
491         struct lu_fid     fid;
492         int               result = 0;
493         int               key_sz = 0;
494         int               rc;
495         __u64             cookie;
496         ENTRY;
497
498         /* In recovery phase, do not need for any lock here */
499         iops = &dor->do_index_ops->dio_it;
500         it = iops->init(env, dor, LUDA_64BITHASH, BYPASS_CAPA);
501         if (IS_ERR(it)) {
502                 rc = PTR_ERR(it);
503                 CERROR("%s: cannot clean PENDING: rc = %d\n",
504                        mdd->mdd_obd_dev->obd_name, rc);
505                 GOTO(out, rc);
506         }
507
508         rc = iops->load(env, it, 0);
509         if (rc < 0)
510                 GOTO(out_put, rc);
511         if (rc == 0) {
512                 CERROR("%s: error loading iterator to clean PENDING\n",
513                        mdd->mdd_obd_dev->obd_name);
514                 /* Index contains no zero key? */
515                 GOTO(out_put, rc = -EIO);
516         }
517
518         do {
519                 key = (void *)iops->key(env, it);
520                 if (IS_ERR(key)) {
521                         CERROR("%s: key failed when clean PENDING: rc = %ld\n",
522                                mdd->mdd_obd_dev->obd_name, PTR_ERR(key));
523                         goto next;
524                 }
525                 key_sz = iops->key_size(env, it);
526
527                 /* filter out "." and ".." entries from PENDING dir. */
528                 if (key_sz < 8)
529                         goto next;
530
531                 memcpy(mti_key, key, key_sz);
532                 mti_key[key_sz] = 0;
533
534                 if (orphan_key_to_fid(mti_key, &fid))
535                         goto next;
536                 if (!fid_is_sane(&fid)) {
537                         CERROR("%s: bad FID "DFID" cleaning PENDING\n",
538                                mdd->mdd_obd_dev->obd_name, PFID(&fid));
539                         goto next;
540                 }
541
542                 /* kill orphan object */
543                 cookie = iops->store(env, it);
544                 iops->put(env, it);
545                 rc = orph_key_test_and_del(env, mdd, &fid,
546                                           (struct dt_key *)mti_key);
547
548                 /* after index delete reset iterator */
549                 if (rc == 0)
550                         result = iops->get(env, it, (const void *)"");
551                 else
552                         result = iops->load(env, it, cookie);
553 next:
554                 result = iops->next(env, it);
555         } while (result == 0);
556
557         GOTO(out_put, rc = 0);
558 out_put:
559         iops->put(env, it);
560         iops->fini(env, it);
561
562 out:
563         return rc;
564 }
565
566 /**
567  * open the PENDING directory for device \a mdd
568  *
569  * The PENDING directory persistently tracks files and directories that were
570  * unlinked from the namespace (nlink == 0) but are still held open by clients.
571  * Those inodes shouldn't be deleted if the MDS crashes, because the clients
572  * would not be able to recover and reopen those files.  Instead, these inodes
573  * are linked into the PENDING directory on disk, and only deleted if all
574  * clients close them, or the MDS finishes client recovery without any client
575  * reopening them (i.e. former clients didn't join recovery).
576  *  \param d   mdd device being started.
577  *
578  *  \retval 0  success
579  *  \retval  -ve index operation error.
580  *
581  */
582 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
583 {
584         struct lu_fid fid;
585         struct dt_object *d;
586         int rc = 0;
587         ENTRY;
588
589         d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
590         if (!IS_ERR(d)) {
591                 mdd->mdd_orphans = d;
592                 if (!dt_try_as_dir(env, d)) {
593                         rc = -ENOTDIR;
594                         CERROR("\"%s\" is not an index! : rc = %d\n",
595                                         orph_index_name, rc);
596                 }
597         } else {
598                 CERROR("cannot find \"%s\" obj %d\n",
599                        orph_index_name, (int)PTR_ERR(d));
600                 rc = PTR_ERR(d);
601         }
602
603         RETURN(rc);
604 }
605
606 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
607 {
608         ENTRY;
609         if (mdd->mdd_orphans != NULL) {
610                 lu_object_put(env, &mdd->mdd_orphans->do_lu);
611                 mdd->mdd_orphans = NULL;
612         }
613         EXIT;
614 }
615
616 /**
617  *  Iterate orphan index to cleanup orphan objects after recovery is done.
618  *  \param d   mdd device in recovery.
619  */
620 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
621 {
622         return orph_index_iterate(env, d);
623 }
624
625 /**
626  *  add an orphan \a obj to the orphan index.
627  *  \param obj file or directory.
628  *  \param th  transaction for index insert.
629  *
630  *  \pre obj nlink == 0 && obj->mod_count != 0
631  *
632  *  \retval 0  success
633  *  \retval  -ve index operation error.
634  */
635 int __mdd_orphan_add(const struct lu_env *env,
636                      struct mdd_object *obj, struct thandle *th)
637 {
638         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
639 }
640
641 /**
642  *  delete an orphan \a obj from orphan index.
643  *  \param obj file or directory.
644  *  \param th  transaction for index deletion and object destruction.
645  *
646  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
647  *
648  *  \retval 0  success
649  *  \retval  -ve index operation error.
650  */
651 int __mdd_orphan_del(const struct lu_env *env,
652                      struct mdd_object *obj, struct thandle *th)
653 {
654         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
655 }