Whamcloud - gitweb
e821f175fd36a1b7df877932f4bf393dc57809a3
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdd/mdd_orphans.c
33  *
34  * Orphan handling code
35  *
36  * Author: Mike Pershin <tappro@clusterfs.com>
37  *         Pravin B Shelar <pravin.shelar@sun.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MDS
41
42 #include <obd.h>
43 #include <obd_class.h>
44 #include <lustre_ver.h>
45 #include <obd_support.h>
46 #include <lustre_fid.h>
47 #include "mdd_internal.h"
48
49 const char orph_index_name[] = "PENDING";
50 static const char dotdot[] = "..";
51
52 enum {
53         ORPH_OP_UNLINK,
54         ORPH_OP_TRUNCATE
55 };
56
57 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
58 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
59
60 static struct dt_key* orph_key_fill(const struct lu_env *env,
61                                     const struct lu_fid *lf, __u32 op)
62 {
63         char *key = mdd_env_info(env)->mti_key;
64         int rc;
65
66         LASSERT(key);
67         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT,
68                       (long long unsigned int)fid_seq(lf),
69                       fid_oid(lf), fid_ver(lf), op);
70         if (rc > 0)
71                 return (struct dt_key*) key;
72         else
73                 return ERR_PTR(rc);
74 }
75
76 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
77                                        const struct lu_fid *lf)
78 {
79         char *key = mdd_env_info(env)->mti_key;
80         int rc;
81
82         LASSERT(key);
83         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18,
84                       (unsigned long long)fid_seq(lf), fid_oid(lf));
85         if (rc > 0)
86                 return (struct dt_key*) key;
87         else
88                 return ERR_PTR(rc);
89 }
90
91 static inline void mdd_orphan_write_lock(const struct lu_env *env,
92                                          struct mdd_device *mdd)
93 {
94         struct dt_object *dor = mdd->mdd_orphans;
95         dt_write_lock(env, dor, MOR_TGT_ORPHAN);
96 }
97
98 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
99                                            struct mdd_device *mdd)
100 {
101         struct dt_object *dor = mdd->mdd_orphans;
102         dt_write_unlock(env, dor);
103 }
104
105 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
106                                         struct mdd_device *mdd,
107                                         struct mdd_object *obj,
108                                         __u32 op, struct thandle *th)
109 {
110         struct dt_insert_rec    *rec    = &mdd_env_info(env)->mti_dt_rec;
111         struct dt_object        *dor    = mdd->mdd_orphans;
112         const struct lu_fid     *lf     = mdo2fid(obj);
113         struct dt_key           *key    = orph_key_fill(env, lf, op);
114
115         rec->rec_fid = lf;
116         rec->rec_type = mdd_object_type(obj);
117
118         return dt_insert(env, dor, (const struct dt_rec *)rec, key, th, 1);
119 }
120
121 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
122                                         struct mdd_device  *mdd ,
123                                         struct dt_key *key,
124                                         struct thandle *th)
125 {
126         struct dt_object *dor = mdd->mdd_orphans;
127
128         return dt_delete(env, dor, key, th);
129 }
130
131 static inline int mdd_orphan_ref_add(const struct lu_env *env,
132                                      struct mdd_device *mdd,
133                                      struct thandle *th)
134 {
135         struct dt_object *dor = mdd->mdd_orphans;
136         return dt_ref_add(env, dor, th);
137 }
138
139 static inline int mdd_orphan_ref_del(const struct lu_env *env,
140                                      struct mdd_device *mdd,
141                                      struct thandle *th)
142 {
143         struct dt_object *dor = mdd->mdd_orphans;
144         return dt_ref_del(env, dor, th);
145 }
146
147
148 int orph_declare_index_insert(const struct lu_env *env,
149                               struct mdd_object *obj,
150                               umode_t mode, struct thandle *th)
151 {
152         struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
153         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
154         struct dt_key           *key;
155         int                     rc;
156
157         key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
158
159         rec->rec_fid = mdo2fid(obj);
160         rec->rec_type = mode;
161         rc = dt_declare_insert(env, mdd->mdd_orphans,
162                                (const struct dt_rec *)rec, key, th);
163         if (rc != 0)
164                 return rc;
165
166         rc = mdo_declare_ref_add(env, obj, th);
167         if (rc)
168                 return rc;
169
170         if (!S_ISDIR(mode))
171                 return 0;
172
173         rc = mdo_declare_ref_add(env, obj, th);
174         if (rc)
175                 return rc;
176
177         rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
178         if (rc)
179                 return rc;
180
181         rc = mdo_declare_index_delete(env, obj, dotdot, th);
182         if (rc)
183                 return rc;
184
185         rc = mdo_declare_index_insert(env, obj,
186                                       lu_object_fid(&mdd->mdd_orphans->do_lu),
187                                       S_IFDIR, dotdot, th);
188
189         return rc;
190 }
191
192 static int orph_index_insert(const struct lu_env *env,
193                              struct mdd_object *obj,
194                              __u32 op, struct thandle *th)
195 {
196         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
197         struct dt_object        *dor    = mdd->mdd_orphans;
198         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
199         struct dt_object        *next   = mdd_object_child(obj);
200         struct dt_insert_rec    *rec    = &mdd_env_info(env)->mti_dt_rec;
201         int                      rc;
202         ENTRY;
203
204         LASSERT(mdd_write_locked(env, obj) != 0);
205         LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
206
207         mdd_orphan_write_lock(env, mdd);
208
209         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
210         if (rc)
211                 GOTO(out, rc);
212
213         mdo_ref_add(env, obj, th);
214         if (!S_ISDIR(mdd_object_type(obj)))
215                 GOTO(out, rc = 0);
216
217         mdo_ref_add(env, obj, th);
218         mdd_orphan_ref_add(env, mdd, th);
219
220         /* try best to fixup directory, dont return errors
221          * from here */
222         if (!dt_try_as_dir(env, next))
223                 GOTO(out, rc = 0);
224
225         dt_delete(env, next, (const struct dt_key *)dotdot, th);
226
227         rec->rec_fid = lf_dor;
228         rec->rec_type = S_IFDIR;
229         dt_insert(env, next, (const struct dt_rec *)rec,
230                   (const struct dt_key *)dotdot, th, 1);
231
232 out:
233         if (rc == 0)
234                 obj->mod_flags |= ORPHAN_OBJ;
235
236         mdd_orphan_write_unlock(env, mdd);
237
238         RETURN(rc);
239 }
240
241 int orph_declare_index_delete(const struct lu_env *env,
242                               struct mdd_object *obj,
243
244                               struct thandle *th)
245 {
246         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
247         struct dt_key     *key;
248         int                rc;
249
250         key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK);
251
252         rc = dt_declare_delete(env, mdd->mdd_orphans, key, th);
253         if (rc)
254                 return rc;
255
256         rc = mdo_declare_ref_del(env, obj, th);
257         if (rc)
258                 return rc;
259
260         if (S_ISDIR(mdd_object_type(obj))) {
261                 rc = mdo_declare_ref_del(env, obj, th);
262                 if (rc)
263                         return rc;
264
265                 rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
266         }
267
268         return rc;
269 }
270
271 static int orph_index_delete(const struct lu_env *env,
272                              struct mdd_object *obj,
273                              __u32 op,
274                              struct thandle *th)
275 {
276         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
277         struct dt_object *dor = mdd->mdd_orphans;
278         struct dt_key *key;
279         int rc;
280
281         ENTRY;
282
283         LASSERT(mdd_write_locked(env, obj) != 0);
284         LASSERT(obj->mod_flags & ORPHAN_OBJ);
285         LASSERT(obj->mod_count == 0);
286
287         LASSERT(dor);
288
289         key = orph_key_fill(env, mdo2fid(obj), op);
290         mdd_orphan_write_lock(env, mdd);
291
292         rc = mdd_orphan_delete_obj(env, mdd, key, th);
293
294         if (rc == -ENOENT) {
295                 key = orph_key_fill_18(env, mdo2fid(obj));
296                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
297         }
298
299         if (!rc) {
300                 /* lov objects will be destroyed by caller */
301                 mdo_ref_del(env, obj, th);
302                 if (S_ISDIR(mdd_object_type(obj))) {
303                         mdo_ref_del(env, obj, th);
304                         mdd_orphan_ref_del(env, mdd, th);
305                 }
306                 obj->mod_flags &= ~ORPHAN_OBJ;
307         } else {
308                 CERROR("could not delete object: rc = %d\n",rc);
309         }
310
311         mdd_orphan_write_unlock(env, mdd);
312         RETURN(rc);
313 }
314
315
316 static int orphan_destroy(const struct lu_env *env, struct mdd_object *obj,
317                           struct dt_key *key)
318 {
319         struct thandle *th = NULL;
320         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
321         int rc = 0;
322         ENTRY;
323
324         th = mdd_trans_create(env, mdd);
325         if (IS_ERR(th)) {
326                 CERROR("Cannot get thandle\n");
327                 RETURN(PTR_ERR(th));
328         }
329
330         rc = orph_declare_index_delete(env, obj, th);
331         if (rc)
332                 GOTO(stop, rc);
333
334         rc = mdo_declare_destroy(env, obj, th);
335         if (rc)
336                 GOTO(stop, rc);
337
338         rc = mdd_trans_start(env, mdd, th);
339         if (rc)
340                 GOTO(stop, rc);
341
342         mdd_write_lock(env, obj, MOR_TGT_CHILD);
343         if (likely(obj->mod_count == 0)) {
344                 mdd_orphan_write_lock(env, mdd);
345                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
346                 if (rc == 0) {
347                         mdo_ref_del(env, obj, th);
348                         if (S_ISDIR(mdd_object_type(obj))) {
349                                 mdo_ref_del(env, obj, th);
350                                 mdd_orphan_ref_del(env, mdd, th);
351                         }
352                         rc = mdo_destroy(env, obj, th);
353                 } else
354                         CERROR("could not delete object: rc = %d\n", rc);
355                 mdd_orphan_write_unlock(env, mdd);
356         }
357         mdd_write_unlock(env, obj);
358
359 stop:
360         rc = mdd_trans_stop(env, mdd, 0, th);
361
362         RETURN(rc);
363 }
364
365 /**
366  * Delete unused orphan with FID \a lf from PENDING directory
367  *
368  * \param mdd  MDD device finishing recovery
369  * \param lf   FID of file or directory to delete
370  * \param key  cookie for this entry in index iterator
371  *
372  * \retval 0   success
373  * \retval -ve error
374  */
375 static int orph_key_test_and_del(const struct lu_env *env,
376                                  struct mdd_device *mdd, struct lu_fid *lf,
377                                  struct dt_key *key)
378 {
379         struct mdd_object *mdo;
380         int rc;
381
382         mdo = mdd_object_find(env, mdd, lf);
383
384         if (IS_ERR(mdo))
385                 return PTR_ERR(mdo);
386
387         rc = -EBUSY;
388         if (mdo->mod_count == 0) {
389                 CDEBUG(D_HA, "Found orphan "DFID", delete it\n", PFID(lf));
390                 rc = orphan_destroy(env, mdo, key);
391                 if (rc) /* so replay-single.sh test_37 works */
392                         CERROR("%s: error unlinking orphan "DFID" from "
393                                "PENDING: rc = %d\n",
394                                mdd2obd_dev(mdd)->obd_name, PFID(lf), rc);
395         } else {
396                 mdd_write_lock(env, mdo, MOR_TGT_CHILD);
397                 if (likely(mdo->mod_count > 0)) {
398                         CDEBUG(D_HA, "Found orphan "DFID" count %d, skip it\n",
399                                PFID(lf), mdo->mod_count);
400                         mdo->mod_flags |= ORPHAN_OBJ;
401                 }
402                 mdd_write_unlock(env, mdo);
403         }
404
405         mdd_object_put(env, mdo);
406         return rc;
407 }
408
409 /**
410  * delete unreferenced files and directories in the PENDING directory
411  *
412  * Files that remain in PENDING after client->MDS recovery has completed
413  * have to be referenced (opened) by some client during recovery, or they
414  * will be deleted here (for clients that did not complete recovery).
415  *
416  * \param thread  info about orphan cleanup thread
417  *
418  * \retval 0   success
419  * \retval -ve error
420  */
421 static int orph_index_iterate(const struct lu_env *env,
422                               struct mdd_generic_thread *thread)
423 {
424         struct mdd_device *mdd = (struct mdd_device *)thread->mgt_data;
425         struct dt_object *dor = mdd->mdd_orphans;
426         struct lu_dirent *ent = &mdd_env_info(env)->mti_ent;
427         const struct dt_it_ops *iops;
428         struct dt_it     *it;
429         struct lu_fid     fid;
430         int               key_sz = 0;
431         int               rc;
432         __u64             cookie;
433         ENTRY;
434
435         iops = &dor->do_index_ops->dio_it;
436         it = iops->init(env, dor, LUDA_64BITHASH);
437         if (IS_ERR(it)) {
438                 rc = PTR_ERR(it);
439                 CERROR("%s: cannot clean PENDING: rc = %d\n",
440                        mdd2obd_dev(mdd)->obd_name, rc);
441                 GOTO(out, rc);
442         }
443
444         rc = iops->load(env, it, 0);
445         if (rc < 0)
446                 GOTO(out_put, rc);
447         if (rc == 0) {
448                 CERROR("%s: error loading iterator to clean PENDING\n",
449                        mdd2obd_dev(mdd)->obd_name);
450                 /* Index contains no zero key? */
451                 GOTO(out_put, rc = -EIO);
452         }
453
454         do {
455                 if (thread->mgt_abort)
456                         break;
457
458                 key_sz = iops->key_size(env, it);
459                 /* filter out "." and ".." entries from PENDING dir. */
460                 if (key_sz < 8)
461                         goto next;
462
463                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
464                 if (rc != 0) {
465                         CERROR("%s: fail to get FID for orphan it: rc = %d\n",
466                                mdd2obd_dev(mdd)->obd_name, rc);
467                         goto next;
468                 }
469
470                 fid_le_to_cpu(&fid, &ent->lde_fid);
471                 if (!fid_is_sane(&fid)) {
472                         CERROR("%s: bad FID "DFID" cleaning PENDING\n",
473                                mdd2obd_dev(mdd)->obd_name, PFID(&fid));
474                         goto next;
475                 }
476
477                 /* kill orphan object */
478                 cookie = iops->store(env, it);
479                 iops->put(env, it);
480                 rc = orph_key_test_and_del(env, mdd, &fid,
481                                            (struct dt_key *)ent->lde_name);
482
483                 /* after index delete reset iterator */
484                 if (rc == 0)
485                         rc = iops->get(env, it, (const void *)"");
486                 else
487                         rc = iops->load(env, it, cookie);
488 next:
489                 rc = iops->next(env, it);
490         } while (rc == 0);
491
492         GOTO(out_put, rc = 0);
493 out_put:
494         iops->put(env, it);
495         iops->fini(env, it);
496
497 out:
498         return rc;
499 }
500
501 /**
502  * open the PENDING directory for device \a mdd
503  *
504  * The PENDING directory persistently tracks files and directories that were
505  * unlinked from the namespace (nlink == 0) but are still held open by clients.
506  * Those inodes shouldn't be deleted if the MDS crashes, because the clients
507  * would not be able to recover and reopen those files.  Instead, these inodes
508  * are linked into the PENDING directory on disk, and only deleted if all
509  * clients close them, or the MDS finishes client recovery without any client
510  * reopening them (i.e. former clients didn't join recovery).
511  *  \param d   mdd device being started.
512  *
513  *  \retval 0  success
514  *  \retval  -ve index operation error.
515  *
516  */
517 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
518 {
519         struct lu_fid            fid;
520         struct dt_object        *d;
521         int                      rc = 0;
522
523         ENTRY;
524
525         /* create PENDING dir */
526         fid_zero(&fid);
527         rc = mdd_local_file_create(env, mdd, &mdd->mdd_local_root_fid,
528                                    orph_index_name, S_IFDIR | S_IRUGO |
529                                    S_IWUSR | S_IXUGO, &fid);
530         if (rc < 0)
531                 RETURN(rc);
532
533         d = dt_locate(env, mdd->mdd_child, &fid);
534         if (IS_ERR(d))
535                 RETURN(PTR_ERR(d));
536         LASSERT(lu_object_exists(&d->do_lu));
537         if (!dt_try_as_dir(env, d)) {
538                 CERROR("%s: \"%s\" is not an index: rc = %d\n",
539                        mdd2obd_dev(mdd)->obd_name, orph_index_name, rc);
540                 dt_object_put(env, d);
541                 RETURN(-ENOTDIR);
542         }
543         mdd->mdd_orphans = d;
544         RETURN(0);
545 }
546
547 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
548 {
549         ENTRY;
550         if (mdd->mdd_orphans != NULL) {
551                 dt_object_put(env, mdd->mdd_orphans);
552                 mdd->mdd_orphans = NULL;
553         }
554         EXIT;
555 }
556
557 static int __mdd_orphan_cleanup(void *args)
558 {
559         struct mdd_generic_thread *thread = (struct mdd_generic_thread *)args;
560         struct lu_env             *env = NULL;
561         int                        rc;
562         ENTRY;
563
564         complete(&thread->mgt_started);
565
566         OBD_ALLOC_PTR(env);
567         if (env == NULL)
568                 GOTO(out, rc = -ENOMEM);
569
570         rc = lu_env_init(env, LCT_MD_THREAD);
571         if (rc)
572                 GOTO(out, rc);
573
574         rc = orph_index_iterate(env, thread);
575
576         lu_env_fini(env);
577         GOTO(out, rc);
578 out:
579         if (env)
580                 OBD_FREE_PTR(env);
581         complete(&thread->mgt_finished);
582         return rc;
583 }
584
585 /**
586  *  Iterate orphan index to cleanup orphan objects after recovery is done.
587  *  \param d   mdd device in recovery.
588  */
589 int mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
590 {
591         int rc = -ENOMEM;
592         char *name = NULL;
593
594         OBD_ALLOC(name, MTI_NAME_MAXLEN);
595         if (name == NULL)
596                 goto out;
597
598         snprintf(name, MTI_NAME_MAXLEN, "orph_cleanup_%s",
599                  mdd2obd_dev(d)->obd_name);
600
601         rc = mdd_generic_thread_start(&d->mdd_orph_cleanup_thread,
602                                       __mdd_orphan_cleanup, (void *)d, name);
603 out:
604         if (rc)
605                 CERROR("%s: start orphan cleanup thread failed:%d\n",
606                        mdd2obd_dev(d)->obd_name, rc);
607         if (name)
608                 OBD_FREE(name, MTI_NAME_MAXLEN);
609         return rc;
610 }
611
612 /**
613  *  add an orphan \a obj to the orphan index.
614  *  \param obj file or directory.
615  *  \param th  transaction for index insert.
616  *
617  *  \pre obj nlink == 0 && obj->mod_count != 0
618  *
619  *  \retval 0  success
620  *  \retval  -ve index operation error.
621  */
622 int __mdd_orphan_add(const struct lu_env *env,
623                      struct mdd_object *obj, struct thandle *th)
624 {
625         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
626 }
627
628 /**
629  *  delete an orphan \a obj from orphan index.
630  *  \param obj file or directory.
631  *  \param th  transaction for index deletion and object destruction.
632  *
633  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
634  *
635  *  \retval 0  success
636  *  \retval  -ve index operation error.
637  */
638 int __mdd_orphan_del(const struct lu_env *env,
639                      struct mdd_object *obj, struct thandle *th)
640 {
641         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
642 }