Whamcloud - gitweb
LU-137 obdclass: add dt_object_put() and use it
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_orphans.c
33  *
34  * Orphan handling code
35  *
36  * Author: Mike Pershin <tappro@clusterfs.com>
37  *         Pravin B Shelar <pravin.shelar@sun.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MDS
41
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_ver.h>
45 #include <obd_support.h>
46 #include <lustre_fid.h>
47 #include "mdd_internal.h"
48
49 const char orph_index_name[] = "PENDING";
50 static const char dotdot[] = "..";
51
52 enum {
53         ORPH_OP_UNLINK,
54         ORPH_OP_TRUNCATE
55 };
56
57 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
58 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
59
60 static struct dt_key* orph_key_fill(const struct lu_env *env,
61                                     const struct lu_fid *lf, __u32 op)
62 {
63         char *key = mdd_env_info(env)->mti_key;
64         int rc;
65
66         LASSERT(key);
67         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT,
68                       (long long unsigned int)fid_seq(lf),
69                       fid_oid(lf), fid_ver(lf), op);
70         if (rc > 0)
71                 return (struct dt_key*) key;
72         else
73                 return ERR_PTR(rc);
74 }
75
76 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
77                                        const struct lu_fid *lf)
78 {
79         char *key = mdd_env_info(env)->mti_key;
80         int rc;
81
82         LASSERT(key);
83         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18,
84                       (unsigned long long)fid_seq(lf), fid_oid(lf));
85         if (rc > 0)
86                 return (struct dt_key*) key;
87         else
88                 return ERR_PTR(rc);
89 }
90
91 static inline void mdd_orphan_write_lock(const struct lu_env *env,
92                                          struct mdd_device *mdd)
93 {
94         struct dt_object *dor = mdd->mdd_orphans;
95         dt_write_lock(env, dor, MOR_TGT_ORPHAN);
96 }
97
98 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
99                                            struct mdd_device *mdd)
100 {
101         struct dt_object *dor = mdd->mdd_orphans;
102         dt_write_unlock(env, dor);
103 }
104
105 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
106                                         struct mdd_device *mdd,
107                                         struct mdd_object *obj,
108                                         __u32 op, struct thandle *th)
109 {
110         struct dt_insert_rec    *rec    = &mdd_env_info(env)->mti_dt_rec;
111         struct dt_object        *dor    = mdd->mdd_orphans;
112         const struct lu_fid     *lf     = mdo2fid(obj);
113         struct dt_key           *key    = orph_key_fill(env, lf, op);
114
115         rec->rec_fid = lf;
116         rec->rec_type = mdd_object_type(obj);
117
118         return dt_insert(env, dor, (const struct dt_rec *)rec, key, th, 1);
119 }
120
121 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
122                                         struct mdd_device  *mdd ,
123                                         struct dt_key *key,
124                                         struct thandle *th)
125 {
126         struct dt_object *dor = mdd->mdd_orphans;
127
128         return dt_delete(env, dor, key, th);
129 }
130
131 static inline int mdd_orphan_ref_add(const struct lu_env *env,
132                                      struct mdd_device *mdd,
133                                      struct thandle *th)
134 {
135         struct dt_object *dor = mdd->mdd_orphans;
136         return dt_ref_add(env, dor, th);
137 }
138
139 static inline int mdd_orphan_ref_del(const struct lu_env *env,
140                                      struct mdd_device *mdd,
141                                      struct thandle *th)
142 {
143         struct dt_object *dor = mdd->mdd_orphans;
144         return dt_ref_del(env, dor, th);
145 }
146
147
148 int orph_declare_index_insert(const struct lu_env *env,
149                               struct mdd_object *obj,
150                               umode_t mode, struct thandle *th)
151 {
152         struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
153         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
154         struct dt_key           *key;
155         int                     rc;
156
157         key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
158
159         rec->rec_fid = mdo2fid(obj);
160         rec->rec_type = mode;
161         rc = dt_declare_insert(env, mdd->mdd_orphans,
162                                (const struct dt_rec *)rec, key, th);
163         if (rc != 0)
164                 return rc;
165
166         rc = mdo_declare_ref_add(env, obj, th);
167         if (rc)
168                 return rc;
169
170         if (!S_ISDIR(mode))
171                 return 0;
172
173         rc = mdo_declare_ref_add(env, obj, th);
174         if (rc)
175                 return rc;
176
177         rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
178         if (rc)
179                 return rc;
180
181         rc = mdo_declare_index_delete(env, obj, dotdot, th);
182         if (rc)
183                 return rc;
184
185         rc = mdo_declare_index_insert(env, obj,
186                                       lu_object_fid(&mdd->mdd_orphans->do_lu),
187                                       S_IFDIR, dotdot, th);
188
189         return rc;
190 }
191
192 static int orph_index_insert(const struct lu_env *env,
193                              struct mdd_object *obj,
194                              __u32 op, struct thandle *th)
195 {
196         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
197         struct dt_object        *dor    = mdd->mdd_orphans;
198         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
199         struct dt_object        *next   = mdd_object_child(obj);
200         struct dt_insert_rec    *rec    = &mdd_env_info(env)->mti_dt_rec;
201         int                      rc;
202         ENTRY;
203
204         LASSERT(mdd_write_locked(env, obj) != 0);
205         LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
206
207         mdd_orphan_write_lock(env, mdd);
208
209         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
210         if (rc)
211                 GOTO(out, rc);
212
213         mdo_ref_add(env, obj, th);
214         if (!S_ISDIR(mdd_object_type(obj)))
215                 GOTO(out, rc = 0);
216
217         mdo_ref_add(env, obj, th);
218         mdd_orphan_ref_add(env, mdd, th);
219
220         /* try best to fixup directory, dont return errors
221          * from here */
222         if (!dt_try_as_dir(env, next))
223                 GOTO(out, rc = 0);
224
225         dt_delete(env, next, (const struct dt_key *)dotdot, th);
226
227         rec->rec_fid = lf_dor;
228         rec->rec_type = S_IFDIR;
229         dt_insert(env, next, (const struct dt_rec *)rec,
230                   (const struct dt_key *)dotdot, th, 1);
231
232 out:
233         if (rc == 0)
234                 obj->mod_flags |= ORPHAN_OBJ;
235
236         mdd_orphan_write_unlock(env, mdd);
237
238         RETURN(rc);
239 }
240
241 int orph_declare_index_delete(const struct lu_env *env,
242                               struct mdd_object *obj,
243
244                               struct thandle *th)
245 {
246         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
247         struct dt_key     *key;
248         int                rc;
249
250         key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
251
252         rc = dt_declare_delete(env, mdd->mdd_orphans, key, th);
253         if (rc)
254                 return rc;
255
256         rc = mdo_declare_ref_del(env, obj, th);
257         if (rc)
258                 return rc;
259
260         if (S_ISDIR(mdd_object_type(obj))) {
261                 rc = mdo_declare_ref_del(env, obj, th);
262                 if (rc)
263                         return rc;
264
265                 rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
266         }
267
268         return rc;
269 }
270
271 static int orph_index_delete(const struct lu_env *env,
272                              struct mdd_object *obj,
273                              __u32 op,
274                              struct thandle *th)
275 {
276         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
277         struct dt_object *dor = mdd->mdd_orphans;
278         struct dt_key *key;
279         int rc;
280
281         ENTRY;
282
283         LASSERT(mdd_write_locked(env, obj) != 0);
284         LASSERT(obj->mod_flags & ORPHAN_OBJ);
285         LASSERT(obj->mod_count == 0);
286
287         LASSERT(dor);
288
289         key = orph_key_fill(env, mdo2fid(obj), op);
290         mdd_orphan_write_lock(env, mdd);
291
292         rc = mdd_orphan_delete_obj(env, mdd, key, th);
293
294         if (rc == -ENOENT) {
295                 key = orph_key_fill_18(env, mdo2fid(obj));
296                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
297         }
298
299         if (!rc) {
300                 /* lov objects will be destroyed by caller */
301                 mdo_ref_del(env, obj, th);
302                 if (S_ISDIR(mdd_object_type(obj))) {
303                         mdo_ref_del(env, obj, th);
304                         mdd_orphan_ref_del(env, mdd, th);
305                 }
306                 obj->mod_flags &= ~ORPHAN_OBJ;
307         } else {
308                 CERROR("could not delete object: rc = %d\n",rc);
309         }
310
311         mdd_orphan_write_unlock(env, mdd);
312         RETURN(rc);
313 }
314
315
316 static int orphan_object_destroy(const struct lu_env *env,
317                                  struct mdd_object *obj,
318                                  struct dt_key *key)
319 {
320         struct thandle *th = NULL;
321         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
322         int rc = 0;
323         ENTRY;
324
325         th = mdd_trans_create(env, mdd);
326         if (IS_ERR(th)) {
327                 CERROR("Cannot get thandle\n");
328                 RETURN(PTR_ERR(th));
329         }
330
331         rc = orph_declare_index_delete(env, obj, th);
332         if (rc)
333                 GOTO(stop, rc);
334
335         rc = mdo_declare_destroy(env, obj, th);
336         if (rc)
337                 GOTO(stop, rc);
338
339         rc = mdd_trans_start(env, mdd, th);
340         if (rc)
341                 GOTO(stop, rc);
342
343         mdd_write_lock(env, obj, MOR_TGT_CHILD);
344         if (likely(obj->mod_count == 0)) {
345                 mdd_orphan_write_lock(env, mdd);
346                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
347                 if (rc == 0) {
348                         mdo_ref_del(env, obj, th);
349                         if (S_ISDIR(mdd_object_type(obj))) {
350                                 mdo_ref_del(env, obj, th);
351                                 mdd_orphan_ref_del(env, mdd, th);
352                         }
353                         rc = mdo_destroy(env, obj, th);
354                 } else
355                         CERROR("could not delete object: rc = %d\n", rc);
356                 mdd_orphan_write_unlock(env, mdd);
357         }
358         mdd_write_unlock(env, obj);
359
360 stop:
361         rc = mdd_trans_stop(env, mdd, 0, th);
362
363         RETURN(rc);
364 }
365
366 /**
367  * Delete unused orphan with FID \a lf from PENDING directory
368  *
369  * \param mdd  MDD device finishing recovery
370  * \param lf   FID of file or directory to delete
371  * \param key  cookie for this entry in index iterator
372  *
373  * \retval 0   success
374  * \retval -ve error
375  */
376 static int orph_key_test_and_del(const struct lu_env *env,
377                                  struct mdd_device *mdd,
378                                  struct lu_fid *lf,
379                                  struct dt_key *key)
380 {
381         struct mdd_object *mdo;
382         int rc;
383
384         mdo = mdd_object_find(env, mdd, lf);
385
386         if (IS_ERR(mdo))
387                 return PTR_ERR(mdo);
388
389         rc = -EBUSY;
390         if (mdo->mod_count == 0) {
391                 CDEBUG(D_HA, "Found orphan "DFID", delete it\n", PFID(lf));
392                 rc = orphan_object_destroy(env, mdo, key);
393                 if (rc) /* so replay-single.sh test_37 works */
394                         CERROR("%s: error unlinking orphan "DFID" from "
395                                "PENDING: rc = %d\n",
396                                mdd2obd_dev(mdd)->obd_name, PFID(lf), rc);
397         } else {
398                 mdd_write_lock(env, mdo, MOR_TGT_CHILD);
399                 if (likely(mdo->mod_count > 0)) {
400                         CDEBUG(D_HA, "Found orphan "DFID" count %d, skip it\n",
401                                PFID(lf), mdo->mod_count);
402                         mdo->mod_flags |= ORPHAN_OBJ;
403                 }
404                 mdd_write_unlock(env, mdo);
405         }
406
407         mdd_object_put(env, mdo);
408         return rc;
409 }
410
411 /**
412  * delete unreferenced files and directories in the PENDING directory
413  *
414  * Files that remain in PENDING after client->MDS recovery has completed
415  * have to be referenced (opened) by some client during recovery, or they
416  * will be deleted here (for clients that did not complete recovery).
417  *
418  * \param thread  info about orphan cleanup thread
419  *
420  * \retval 0   success
421  * \retval -ve error
422  */
423 static int orph_index_iterate(const struct lu_env *env,
424                               struct mdd_generic_thread *thread)
425 {
426         struct mdd_device *mdd = (struct mdd_device *)thread->mgt_data;
427         struct dt_object *dor = mdd->mdd_orphans;
428         struct lu_dirent *ent = &mdd_env_info(env)->mti_ent;
429         const struct dt_it_ops *iops;
430         struct dt_it     *it;
431         struct lu_fid     fid;
432         int               key_sz = 0;
433         int               rc;
434         __u64             cookie;
435         ENTRY;
436
437         iops = &dor->do_index_ops->dio_it;
438         it = iops->init(env, dor, LUDA_64BITHASH);
439         if (IS_ERR(it)) {
440                 rc = PTR_ERR(it);
441                 CERROR("%s: cannot clean PENDING: rc = %d\n",
442                        mdd2obd_dev(mdd)->obd_name, rc);
443                 GOTO(out, rc);
444         }
445
446         rc = iops->load(env, it, 0);
447         if (rc < 0)
448                 GOTO(out_put, rc);
449         if (rc == 0) {
450                 CERROR("%s: error loading iterator to clean PENDING\n",
451                        mdd2obd_dev(mdd)->obd_name);
452                 /* Index contains no zero key? */
453                 GOTO(out_put, rc = -EIO);
454         }
455
456         do {
457                 if (thread->mgt_abort)
458                         break;
459
460                 key_sz = iops->key_size(env, it);
461                 /* filter out "." and ".." entries from PENDING dir. */
462                 if (key_sz < 8)
463                         goto next;
464
465                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
466                 if (rc != 0) {
467                         CERROR("%s: fail to get FID for orphan it: rc = %d\n",
468                                mdd2obd_dev(mdd)->obd_name, rc);
469                         goto next;
470                 }
471
472                 fid_le_to_cpu(&fid, &ent->lde_fid);
473                 if (!fid_is_sane(&fid)) {
474                         CERROR("%s: bad FID "DFID" cleaning PENDING\n",
475                                mdd2obd_dev(mdd)->obd_name, PFID(&fid));
476                         goto next;
477                 }
478
479                 /* kill orphan object */
480                 cookie = iops->store(env, it);
481                 iops->put(env, it);
482                 rc = orph_key_test_and_del(env, mdd, &fid,
483                                            (struct dt_key *)ent->lde_name);
484
485                 /* after index delete reset iterator */
486                 if (rc == 0)
487                         rc = iops->get(env, it, (const void *)"");
488                 else
489                         rc = iops->load(env, it, cookie);
490 next:
491                 rc = iops->next(env, it);
492         } while (rc == 0);
493
494         GOTO(out_put, rc = 0);
495 out_put:
496         iops->put(env, it);
497         iops->fini(env, it);
498
499 out:
500         return rc;
501 }
502
503 /**
504  * open the PENDING directory for device \a mdd
505  *
506  * The PENDING directory persistently tracks files and directories that were
507  * unlinked from the namespace (nlink == 0) but are still held open by clients.
508  * Those inodes shouldn't be deleted if the MDS crashes, because the clients
509  * would not be able to recover and reopen those files.  Instead, these inodes
510  * are linked into the PENDING directory on disk, and only deleted if all
511  * clients close them, or the MDS finishes client recovery without any client
512  * reopening them (i.e. former clients didn't join recovery).
513  *  \param d   mdd device being started.
514  *
515  *  \retval 0  success
516  *  \retval  -ve index operation error.
517  *
518  */
519 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
520 {
521         struct lu_fid            fid;
522         struct dt_object        *d;
523         int                      rc = 0;
524
525         ENTRY;
526
527         /* create PENDING dir */
528         fid_zero(&fid);
529         rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
530                                    orph_index_name, S_IFDIR | S_IRUGO |
531                                    S_IWUSR | S_IXUGO, &fid);
532         if (rc < 0)
533                 RETURN(rc);
534
535         d = dt_locate(env, mdd->mdd_child, &fid);
536         if (IS_ERR(d))
537                 RETURN(PTR_ERR(d));
538         LASSERT(lu_object_exists(&d->do_lu));
539         if (!dt_try_as_dir(env, d)) {
540                 CERROR("%s: \"%s\" is not an index: rc = %d\n",
541                        mdd2obd_dev(mdd)->obd_name, orph_index_name, rc);
542                 dt_object_put(env, d);
543                 RETURN(-ENOTDIR);
544         }
545         mdd->mdd_orphans = d;
546         RETURN(0);
547 }
548
549 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
550 {
551         ENTRY;
552         if (mdd->mdd_orphans != NULL) {
553                 dt_object_put(env, mdd->mdd_orphans);
554                 mdd->mdd_orphans = NULL;
555         }
556         EXIT;
557 }
558
559 static int __mdd_orphan_cleanup(void *args)
560 {
561         struct mdd_generic_thread *thread = (struct mdd_generic_thread *)args;
562         struct lu_env             *env = NULL;
563         int                        rc;
564         ENTRY;
565
566         complete(&thread->mgt_started);
567
568         OBD_ALLOC_PTR(env);
569         if (env == NULL)
570                 GOTO(out, rc = -ENOMEM);
571
572         rc = lu_env_init(env, LCT_MD_THREAD);
573         if (rc)
574                 GOTO(out, rc);
575
576         rc = orph_index_iterate(env, thread);
577
578         lu_env_fini(env);
579         GOTO(out, rc);
580 out:
581         if (env)
582                 OBD_FREE_PTR(env);
583         complete(&thread->mgt_finished);
584         return rc;
585 }
586
587 /**
588  *  Iterate orphan index to cleanup orphan objects after recovery is done.
589  *  \param d   mdd device in recovery.
590  */
591 int mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
592 {
593         int rc = -ENOMEM;
594         char *name = NULL;
595
596         OBD_ALLOC(name, MTI_NAME_MAXLEN);
597         if (name == NULL)
598                 goto out;
599
600         snprintf(name, MTI_NAME_MAXLEN, "orph_cleanup_%s",
601                  mdd2obd_dev(d)->obd_name);
602
603         rc = mdd_generic_thread_start(&d->mdd_orph_cleanup_thread,
604                                       __mdd_orphan_cleanup, (void *)d, name);
605 out:
606         if (rc)
607                 CERROR("%s: start orphan cleanup thread failed:%d\n",
608                        mdd2obd_dev(d)->obd_name, rc);
609         if (name)
610                 OBD_FREE(name, MTI_NAME_MAXLEN);
611         return rc;
612 }
613
614 /**
615  *  add an orphan \a obj to the orphan index.
616  *  \param obj file or directory.
617  *  \param th  transaction for index insert.
618  *
619  *  \pre obj nlink == 0 && obj->mod_count != 0
620  *
621  *  \retval 0  success
622  *  \retval  -ve index operation error.
623  */
624 int __mdd_orphan_add(const struct lu_env *env,
625                      struct mdd_object *obj, struct thandle *th)
626 {
627         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
628 }
629
630 /**
631  *  delete an orphan \a obj from orphan index.
632  *  \param obj file or directory.
633  *  \param th  transaction for index deletion and object destruction.
634  *
635  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
636  *
637  *  \retval 0  success
638  *  \retval  -ve index operation error.
639  */
640 int __mdd_orphan_del(const struct lu_env *env,
641                      struct mdd_object *obj, struct thandle *th)
642 {
643         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
644 }