Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_orphans.c
37  *
38  * Orphan handling code
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  *         Pravin B Shelar <pravin.shelar@sun.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <obd.h>
50 #include <obd_class.h>
51 #include <lustre_ver.h>
52 #include <obd_support.h>
53 #include <lustre_fid.h>
54 #include "mdd_internal.h"
55
56 const char orph_index_name[] = "PENDING";
57 const char *dotdot = "..";
58
59 enum {
60         ORPH_OP_UNLINK,
61         ORPH_OP_TRUNCATE
62 };
63
64 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
65 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
66
67 static struct dt_key* orph_key_fill(const struct lu_env *env,
68                                     const struct lu_fid *lf, __u32 op)
69 {
70         char *key = mdd_env_info(env)->mti_orph_key;
71         int rc;
72
73         LASSERT(key);
74         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT,
75                       (long long unsigned int)fid_seq(lf),
76                       fid_oid(lf), fid_ver(lf), op);
77         if (rc > 0)
78                 return (struct dt_key*) key;
79         else
80                 return ERR_PTR(rc);
81 }
82
83 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
84                                        const struct lu_fid *lf)
85 {
86         char *key = mdd_env_info(env)->mti_orph_key;
87         int rc;
88
89         LASSERT(key);
90         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18,
91                       (unsigned long long)fid_seq(lf), fid_oid(lf));
92         if (rc > 0)
93                 return (struct dt_key*) key;
94         else
95                 return ERR_PTR(rc);
96 }
97
98 static int orphan_key_to_fid(char *key, struct lu_fid *lf)
99 {
100         int rc = 0;
101         unsigned int op;
102
103         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT,
104                     (long long unsigned int *)&lf->f_seq, &lf->f_oid,
105                     &lf->f_ver, &op);
106         if (rc == 4)
107                 return 0;
108
109         /* build igif */
110         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
111                     (long long unsigned int *)&lf->f_seq, &lf->f_oid);
112         if (rc == 2) {
113                 lf->f_ver = 0;
114                 return 0;
115         }
116
117         CERROR("can not parse orphan file name %s\n", key);
118         return -EINVAL;
119 }
120
121 static inline void mdd_orphan_write_lock(const struct lu_env *env,
122                                     struct mdd_device *mdd)
123 {
124
125         struct dt_object        *dor    = mdd->mdd_orphans;
126         dor->do_ops->do_write_lock(env, dor, MOR_TGT_ORPHAN);
127 }
128
129 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
130                                            struct mdd_device *mdd)
131 {
132
133         struct dt_object        *dor    = mdd->mdd_orphans;
134         dor->do_ops->do_write_unlock(env, dor);
135 }
136
137 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
138                                         struct mdd_device *mdd,
139                                         struct mdd_object *obj,
140                                         __u32 op,
141                                         struct thandle *th)
142 {
143         struct dt_object        *dor    = mdd->mdd_orphans;
144         const struct lu_fid     *lf     = mdo2fid(obj);
145         struct dt_key           *key    = orph_key_fill(env, lf, op);
146         ENTRY;
147
148         return  dor->do_index_ops->dio_insert(env, dor,
149                                               (struct dt_rec *)lf,
150                                               key, th,
151                                               BYPASS_CAPA, 1);
152 }
153
154 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
155                                         struct mdd_device  *mdd ,
156                                         struct dt_key *key,
157                                         struct thandle *th)
158 {
159         struct dt_object        *dor    = mdd->mdd_orphans;
160
161         return  dor->do_index_ops->dio_delete(env, dor,
162                                               key, th,
163                                               BYPASS_CAPA);
164 }
165
166 static inline void mdd_orphan_ref_add(const struct lu_env *env,
167                                  struct mdd_device *mdd,
168                                  struct thandle *th)
169 {
170         struct dt_object        *dor    = mdd->mdd_orphans;
171         dor->do_ops->do_ref_add(env, dor, th);
172 }
173
174 static inline void mdd_orphan_ref_del(const struct lu_env *env,
175                                  struct mdd_device *mdd,
176                                  struct thandle *th)
177 {
178         struct dt_object        *dor    = mdd->mdd_orphans;
179         dor->do_ops->do_ref_del(env, dor, th);
180 }
181
182
183 int orph_declare_index_insert(const struct lu_env *env,
184                               struct mdd_object *obj,
185                               struct thandle *th)
186 {
187         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
188         int                rc;
189
190         rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th);
191         if (rc)
192                 return rc;
193
194         rc = mdo_declare_ref_add(env, obj, th);
195         if (rc)
196                 return rc;
197
198         if (!S_ISDIR(mdd_object_type(obj)))
199                 return 0;
200
201         rc = mdo_declare_ref_add(env, obj, th);
202         if (rc)
203                 return rc;
204
205         rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
206         if (rc)
207                 return rc;
208
209         rc = mdo_declare_index_delete(env, obj, dotdot, th);
210         if (rc)
211                 return rc;
212
213         rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th);
214
215         return rc;
216 }
217
218 static int orph_index_insert(const struct lu_env *env,
219                              struct mdd_object *obj,
220                              __u32 op,
221                              struct thandle *th)
222 {
223         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
224         struct dt_object        *dor    = mdd->mdd_orphans;
225         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
226         struct dt_object        *next   = mdd_object_child(obj);
227         int rc;
228         ENTRY;
229
230         LASSERT(mdd_write_locked(env, obj) != 0);
231         LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
232         LASSERT(obj->mod_count > 0);
233
234         mdd_orphan_write_lock(env, mdd);
235
236         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
237         if (rc)
238                 GOTO(out, rc);
239
240         mdo_ref_add(env, obj, th);
241         if (!S_ISDIR(mdd_object_type(obj)))
242                 goto out;
243
244         mdo_ref_add(env, obj, th);
245         mdd_orphan_ref_add(env, mdd, th);
246
247         /* try best to fixup directory, dont return errors
248          * from here */
249         if (!dt_try_as_dir(env, next))
250                 goto out;
251         next->do_index_ops->dio_delete(env, next,
252                                        (const struct dt_key *)dotdot,
253                                        th, BYPASS_CAPA);
254
255         next->do_index_ops->dio_insert(env, next,
256                                        (struct dt_rec *)lf_dor,
257                                        (const struct dt_key *)dotdot,
258                                        th, BYPASS_CAPA, 1);
259
260 out:
261         if (rc == 0)
262                 obj->mod_flags |= ORPHAN_OBJ;
263
264         mdd_orphan_write_unlock(env, mdd);
265
266         RETURN(rc);
267 }
268
269 /**
270  * Destroy OSD object on MDD and associated OST objects.
271  *
272  * \param obj orphan object
273  * \param mdd used for sending llog msg to osts
274  *
275  * \retval  0   success
276  * \retval -ve  error
277  */
278 static int orphan_object_kill(const struct lu_env *env,
279                               struct mdd_object *obj,
280                               struct mdd_device *mdd,
281                               struct thandle *th)
282 {
283         struct lu_attr *la = &mdd_env_info(env)->mti_la;
284         int rc = 0;
285         ENTRY;
286
287         /* No need to lock this object as its recovery phase, and
288          * no other thread can access it. But we need to lock it
289          * as its precondition for osd api we using. */
290
291         mdo_ref_del(env, obj, th);
292         if (S_ISDIR(mdd_object_type(obj))) {
293                 mdo_ref_del(env, obj, th);
294                 mdd_orphan_ref_del(env, mdd, th);
295         } else {
296                 /* regular file , cleanup linked ost objects */
297                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
298                 if (rc == 0)
299                         rc = mdd_lov_destroy(env, mdd, obj, la);
300         }
301         mdo_destroy(env, obj, th);
302         RETURN(rc);
303 }
304
305 int orph_declare_index_delete(const struct lu_env *env,
306                               struct mdd_object *obj,
307                               struct thandle *th)
308 {
309         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
310         int                rc;
311
312         rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th);
313         if (rc)
314                 return rc;
315
316         rc = mdo_declare_ref_del(env, obj, th);
317         if (rc)
318                 return rc;
319
320         if (S_ISDIR(mdd_object_type(obj))) {
321                 rc = mdo_declare_ref_del(env, obj, th);
322                 if (rc)
323                         return rc;
324
325                 rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
326         }
327
328         return rc;
329 }
330
331 static int orph_index_delete(const struct lu_env *env,
332                              struct mdd_object *obj,
333                              __u32 op,
334                              struct thandle *th)
335 {
336         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
337         struct dt_object *dor = mdd->mdd_orphans;
338         struct dt_key *key;
339         int rc;
340
341         ENTRY;
342
343         LASSERT(mdd_write_locked(env, obj) != 0);
344         LASSERT(obj->mod_flags & ORPHAN_OBJ);
345         LASSERT(obj->mod_count == 0);
346
347         LASSERT(dor);
348
349         key = orph_key_fill(env, mdo2fid(obj), op);
350         mdd_orphan_write_lock(env, mdd);
351
352         rc = mdd_orphan_delete_obj(env, mdd, key, th);
353
354         if (rc == -ENOENT) {
355                 key = orph_key_fill_18(env, mdo2fid(obj));
356                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
357         }
358
359         if (!rc) {
360                 /* lov objects will be destroyed by caller */
361                 mdo_ref_del(env, obj, th);
362                 if (S_ISDIR(mdd_object_type(obj))) {
363                         mdo_ref_del(env, obj, th);
364                         mdd_orphan_ref_del(env, mdd, th);
365                 }
366                 obj->mod_flags &= ~ORPHAN_OBJ;
367         } else {
368                 CERROR("could not delete object: rc = %d\n",rc);
369         }
370
371         mdd_orphan_write_unlock(env, mdd);
372         RETURN(rc);
373 }
374
375
376 static int orphan_object_destroy(const struct lu_env *env,
377                                  struct mdd_object *obj,
378                                  struct dt_key *key)
379 {
380         struct thandle *th = NULL;
381         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
382         struct md_attr *ma = &mdd_env_info(env)->mti_ma;
383         int rc = 0;
384         ENTRY;
385
386         /* init ma */
387         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
388         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
389         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
390         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
391         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
392         ma->ma_valid = 0;
393
394         th = mdd_trans_create(env, mdd);
395         if (IS_ERR(th)) {
396                 CERROR("Cannot get thandle\n");
397                 RETURN(-ENOMEM);
398         }
399         rc = orph_declare_index_delete(env, obj, th);
400         if (rc)
401                 GOTO(stop, rc);
402
403         rc = mdd_declare_object_kill(env, obj, ma, th);
404         if (rc)
405                 GOTO(stop, rc);
406
407         rc = mdd_trans_start(env, mdd, th);
408         if (rc)
409                 GOTO(stop, rc);
410
411         mdd_write_lock(env, obj, MOR_TGT_CHILD);
412         if (likely(obj->mod_count == 0)) {
413                 mdd_orphan_write_lock(env, mdd);
414                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
415                 if (rc == 0)
416                         orphan_object_kill(env, obj, mdd, th);
417                 else
418                         CERROR("could not delete object: rc = %d\n",rc);
419                 mdd_orphan_write_unlock(env, mdd);
420         }
421         mdd_write_unlock(env, obj);
422
423 stop:
424         mdd_trans_stop(env, mdd, 0, th);
425
426         RETURN(rc);
427 }
428
429 /**
430  * Delete unused orphan with FID \a lf from PENDING directory
431  *
432  * \param mdd  MDD device finishing recovery
433  * \param lf   FID of file or directory to delete
434  * \param key  cookie for this entry in index iterator
435  *
436  * \retval 0   success
437  * \retval -ve error
438  */
439 static int orph_key_test_and_del(const struct lu_env *env,
440                                  struct mdd_device *mdd,
441                                  struct lu_fid *lf,
442                                  struct dt_key *key)
443 {
444         struct mdd_object *mdo;
445         int rc;
446
447         mdo = mdd_object_find(env, mdd, lf);
448
449         if (IS_ERR(mdo))
450                 return PTR_ERR(mdo);
451
452         rc = -EBUSY;
453         if (mdo->mod_count == 0) {
454                 CDEBUG(D_HA, "Found orphan "DFID", delete it\n", PFID(lf));
455                 rc = orphan_object_destroy(env, mdo, key);
456                 if (rc) /* so replay-single.sh test_37 works */
457                         CERROR("%s: error unlinking orphan "DFID" from "
458                                "PENDING: rc = %d\n",
459                                mdd->mdd_obd_dev->obd_name, PFID(lf), rc);
460         } else {
461                 mdd_write_lock(env, mdo, MOR_TGT_CHILD);
462                 if (likely(mdo->mod_count > 0)) {
463                         CDEBUG(D_HA, "Found orphan "DFID" count %d, skip it\n",
464                                PFID(lf), mdo->mod_count);
465                         mdo->mod_flags |= ORPHAN_OBJ;
466                 }
467                 mdd_write_unlock(env, mdo);
468         }
469
470         mdd_object_put(env, mdo);
471         return rc;
472 }
473
474 /**
475  * delete unreferenced files and directories in the PENDING directory
476  *
477  * Files that remain in PENDING after client->MDS recovery has completed
478  * have to be referenced (opened) by some client during recovery, or they
479  * will be deleted here (for clients that did not complete recovery).
480  *
481  * \param mdd  MDD device finishing recovery
482  *
483  * \retval 0   success
484  * \retval -ve error
485  */
486 static int orph_index_iterate(const struct lu_env *env,
487                               struct mdd_device *mdd)
488 {
489         struct dt_object *dor = mdd->mdd_orphans;
490         char             *mti_key = mdd_env_info(env)->mti_orph_key;
491         const struct dt_it_ops *iops;
492         struct dt_it     *it;
493         char             *key;
494         struct lu_fid     fid;
495         int               result = 0;
496         int               key_sz = 0;
497         int               rc;
498         __u64             cookie;
499         ENTRY;
500
501         /* In recovery phase, do not need for any lock here */
502         iops = &dor->do_index_ops->dio_it;
503         it = iops->init(env, dor, LUDA_64BITHASH, BYPASS_CAPA);
504         if (IS_ERR(it)) {
505                 rc = PTR_ERR(it);
506                 CERROR("%s: cannot clean PENDING: rc = %d\n",
507                        mdd->mdd_obd_dev->obd_name, rc);
508                 GOTO(out, rc);
509         }
510
511         rc = iops->load(env, it, 0);
512         if (rc < 0)
513                 GOTO(out_put, rc);
514         if (rc == 0) {
515                 CERROR("%s: error loading iterator to clean PENDING\n",
516                        mdd->mdd_obd_dev->obd_name);
517                 /* Index contains no zero key? */
518                 GOTO(out_put, rc = -EIO);
519         }
520
521         do {
522                 key = (void *)iops->key(env, it);
523                 if (IS_ERR(key)) {
524                         CERROR("%s: key failed when clean PENDING: rc = %ld\n",
525                                mdd->mdd_obd_dev->obd_name, PTR_ERR(key));
526                         goto next;
527                 }
528                 key_sz = iops->key_size(env, it);
529
530                 /* filter out "." and ".." entries from PENDING dir. */
531                 if (key_sz < 8)
532                         goto next;
533
534                 memcpy(mti_key, key, key_sz);
535                 mti_key[key_sz] = 0;
536
537                 if (orphan_key_to_fid(mti_key, &fid))
538                         goto next;
539                 if (!fid_is_sane(&fid)) {
540                         CERROR("%s: bad FID "DFID" cleaning PENDING\n",
541                                mdd->mdd_obd_dev->obd_name, PFID(&fid));
542                         goto next;
543                 }
544
545                 /* kill orphan object */
546                 cookie = iops->store(env, it);
547                 iops->put(env, it);
548                 rc = orph_key_test_and_del(env, mdd, &fid,
549                                           (struct dt_key *)mti_key);
550
551                 /* after index delete reset iterator */
552                 if (rc == 0)
553                         result = iops->get(env, it, (const void *)"");
554                 else
555                         result = iops->load(env, it, cookie);
556 next:
557                 result = iops->next(env, it);
558         } while (result == 0);
559
560         GOTO(out_put, rc = 0);
561 out_put:
562         iops->put(env, it);
563         iops->fini(env, it);
564
565 out:
566         return rc;
567 }
568
569 /**
570  * open the PENDING directory for device \a mdd
571  *
572  * The PENDING directory persistently tracks files and directories that were
573  * unlinked from the namespace (nlink == 0) but are still held open by clients.
574  * Those inodes shouldn't be deleted if the MDS crashes, because the clients
575  * would not be able to recover and reopen those files.  Instead, these inodes
576  * are linked into the PENDING directory on disk, and only deleted if all
577  * clients close them, or the MDS finishes client recovery without any client
578  * reopening them (i.e. former clients didn't join recovery).
579  *  \param d   mdd device being started.
580  *
581  *  \retval 0  success
582  *  \retval  -ve index operation error.
583  *
584  */
585 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
586 {
587         struct lu_fid fid;
588         struct dt_object *d;
589         int rc = 0;
590         ENTRY;
591
592         d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
593         if (!IS_ERR(d)) {
594                 mdd->mdd_orphans = d;
595                 if (!dt_try_as_dir(env, d)) {
596                         rc = -ENOTDIR;
597                         CERROR("\"%s\" is not an index! : rc = %d\n",
598                                         orph_index_name, rc);
599                 }
600         } else {
601                 CERROR("cannot find \"%s\" obj %d\n",
602                        orph_index_name, (int)PTR_ERR(d));
603                 rc = PTR_ERR(d);
604         }
605
606         RETURN(rc);
607 }
608
609 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
610 {
611         ENTRY;
612         if (mdd->mdd_orphans != NULL) {
613                 lu_object_put(env, &mdd->mdd_orphans->do_lu);
614                 mdd->mdd_orphans = NULL;
615         }
616         EXIT;
617 }
618
619 /**
620  *  Iterate orphan index to cleanup orphan objects after recovery is done.
621  *  \param d   mdd device in recovery.
622  */
623 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
624 {
625         return orph_index_iterate(env, d);
626 }
627
628 /**
629  *  add an orphan \a obj to the orphan index.
630  *  \param obj file or directory.
631  *  \param th  transaction for index insert.
632  *
633  *  \pre obj nlink == 0 && obj->mod_count != 0
634  *
635  *  \retval 0  success
636  *  \retval  -ve index operation error.
637  */
638 int __mdd_orphan_add(const struct lu_env *env,
639                      struct mdd_object *obj, struct thandle *th)
640 {
641         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
642 }
643
644 /**
645  *  delete an orphan \a obj from orphan index.
646  *  \param obj file or directory.
647  *  \param th  transaction for index deletion and object destruction.
648  *
649  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
650  *
651  *  \retval 0  success
652  *  \retval  -ve index operation error.
653  */
654 int __mdd_orphan_del(const struct lu_env *env,
655                      struct mdd_object *obj, struct thandle *th)
656 {
657         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
658 }