Whamcloud - gitweb
b=20500
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_orphans.c
37  *
38  * Orphan handling code
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  *         Pravin B Shelar <pravin.shelar@sun.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <obd.h>
50 #include <obd_class.h>
51 #include <lustre_ver.h>
52 #include <obd_support.h>
53 #include <lustre_fid.h>
54 #include "mdd_internal.h"
55
56 const char orph_index_name[] = "PENDING";
57
58 enum {
59         ORPH_OP_UNLINK,
60         ORPH_OP_TRUNCATE
61 };
62
63 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
64 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
65
66 static struct dt_key* orph_key_fill(const struct lu_env *env,
67                                     const struct lu_fid *lf, __u32 op)
68 {
69         char *key = mdd_env_info(env)->mti_orph_key;
70         int rc;
71
72         LASSERT(key);
73         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT, fid_seq(lf),
74                       fid_oid(lf), fid_ver(lf), op);
75         if (rc > 0)
76                 return (struct dt_key*) key;
77         else
78                 return ERR_PTR(rc);
79 }
80
81 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
82                                        const struct lu_fid *lf)
83 {
84         char *key = mdd_env_info(env)->mti_orph_key;
85         int rc;
86
87         LASSERT(key);
88         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18, fid_seq(lf),
89                       fid_oid(lf));
90         if (rc > 0)
91                 return (struct dt_key*) key;
92         else
93                 return ERR_PTR(rc);
94 }
95
96 static int orphan_key_to_fid(char *key, struct lu_fid *lf)
97 {
98         int rc = 0;
99         unsigned int op;
100
101         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT, &lf->f_seq, &lf->f_oid,
102                     &lf->f_ver, &op);
103         if (rc == 4)
104                 return 0;
105
106         /* build igif */
107         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
108                     &lf->f_seq, &lf->f_oid);
109         if (rc == 2) {
110                 lf->f_ver = 0;
111                 return 0;
112         }
113
114         CERROR("can not parse orphan file name %s\n",key);
115         return -EINVAL;
116 }
117
118 static inline void mdd_orphan_write_lock(const struct lu_env *env,
119                                     struct mdd_device *mdd)
120 {
121
122         struct dt_object        *dor    = mdd->mdd_orphans;
123         dor->do_ops->do_write_lock(env, dor, MOR_TGT_ORPHAN);
124 }
125
126 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
127                                            struct mdd_device *mdd)
128 {
129
130         struct dt_object        *dor    = mdd->mdd_orphans;
131         dor->do_ops->do_write_unlock(env, dor);
132 }
133
134 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
135                                         struct mdd_device *mdd,
136                                         struct mdd_object *obj,
137                                         __u32 op,
138                                         struct thandle *th)
139 {
140         struct dt_object        *dor    = mdd->mdd_orphans;
141         const struct lu_fid     *lf     = mdo2fid(obj);
142         struct dt_key           *key    = orph_key_fill(env, lf, op);
143         ENTRY;
144
145         return  dor->do_index_ops->dio_insert(env, dor,
146                                               (struct dt_rec *)lf,
147                                               key, th,
148                                               BYPASS_CAPA, 1);
149 }
150
151 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
152                                         struct mdd_device  *mdd ,
153                                         struct dt_key *key,
154                                         struct thandle *th)
155 {
156         struct dt_object        *dor    = mdd->mdd_orphans;
157
158         return  dor->do_index_ops->dio_delete(env, dor,
159                                               key, th,
160                                               BYPASS_CAPA);
161 }
162
163 static inline void mdd_orphan_ref_add(const struct lu_env *env,
164                                  struct mdd_device *mdd,
165                                  struct thandle *th)
166 {
167         struct dt_object        *dor    = mdd->mdd_orphans;
168         dor->do_ops->do_ref_add(env, dor, th);
169 }
170
171 static inline void mdd_orphan_ref_del(const struct lu_env *env,
172                                  struct mdd_device *mdd,
173                                  struct thandle *th)
174 {
175         struct dt_object        *dor    = mdd->mdd_orphans;
176         dor->do_ops->do_ref_del(env, dor, th);
177 }
178
179
180 static int orph_index_insert(const struct lu_env *env,
181                              struct mdd_object *obj,
182                              __u32 op,
183                              struct thandle *th)
184 {
185         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
186         struct dt_object        *dor    = mdd->mdd_orphans;
187         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
188         struct dt_object        *next   = mdd_object_child(obj);
189         const struct dt_key     *dotdot = (const struct dt_key *) "..";
190         int rc;
191         ENTRY;
192
193         LASSERT(mdd_write_locked(env, obj) != 0);
194         LASSERT(!(obj->mod_flags & ORPHAN_OBJ));
195         LASSERT(obj->mod_count > 0);
196
197         mdd_orphan_write_lock(env, mdd);
198
199         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
200         if (rc)
201                 GOTO(out, rc);
202
203         mdo_ref_add(env, obj, th);
204         if (!S_ISDIR(mdd_object_type(obj)))
205                 goto out;
206
207         mdo_ref_add(env, obj, th);
208         mdd_orphan_ref_add(env, mdd, th);
209
210         /* try best to fixup directory, dont return errors
211          * from here */
212         if (!dt_try_as_dir(env, next))
213                 goto out;
214         next->do_index_ops->dio_delete(env, next,
215                                        dotdot, th, BYPASS_CAPA);
216
217         next->do_index_ops->dio_insert(env, next,
218                                        (struct dt_rec *) lf_dor,
219                                        dotdot, th, BYPASS_CAPA, 1);
220
221 out:
222         if (rc == 0)
223                 obj->mod_flags |= ORPHAN_OBJ;
224
225         mdd_orphan_write_unlock(env, mdd);
226
227         RETURN(rc);
228 }
229
230 /**
231  * destroy osd object on mdd and associated ost objects.
232  *
233  * \param obj orphan object
234  * \param mdd used for sending llog msg to osts
235  *
236  * \retval  0   success
237  * \retval -ve  error
238  */
239 static int orphan_object_kill(const struct lu_env *env,
240                               struct mdd_object *obj,
241                               struct mdd_device *mdd,
242                               struct thandle *th)
243 {
244         struct lu_attr *la = &mdd_env_info(env)->mti_la;
245         int rc = 0;
246         ENTRY;
247
248         /* No need to lock this object as its recovery phase, and
249          * no other thread can access it. But we need to lock it
250          * as its precondition for osd api we using. */
251
252         mdo_ref_del(env, obj, th);
253         if (S_ISDIR(mdd_object_type(obj))) {
254                 mdo_ref_del(env, obj, th);
255                 mdd_orphan_ref_del(env, mdd, th);
256         } else {
257                 /* regular file , cleanup linked ost objects */
258                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
259                 if (rc == 0)
260                         rc = mdd_lov_destroy(env, mdd, obj, la);
261         }
262         RETURN(rc);
263 }
264
265 static int orph_index_delete(const struct lu_env *env,
266                              struct mdd_object *obj,
267                              __u32 op,
268                              struct thandle *th)
269 {
270         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
271         struct dt_object *dor = mdd->mdd_orphans;
272         struct dt_key *key;
273         int rc;
274
275         ENTRY;
276
277         LASSERT(mdd_write_locked(env, obj) != 0);
278         LASSERT(obj->mod_flags & ORPHAN_OBJ);
279         LASSERT(obj->mod_count == 0);
280
281         LASSERT(dor);
282
283         key = orph_key_fill(env, mdo2fid(obj), op);
284         mdd_orphan_write_lock(env, mdd);
285
286         rc = mdd_orphan_delete_obj(env, mdd, key, th);
287
288         if (rc == -ENOENT) {
289                 key = orph_key_fill_18(env, mdo2fid(obj));
290                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
291         }
292
293         if (!rc) {
294                 /* lov objects will be destroyed by caller */
295                 mdo_ref_del(env, obj, th);
296                 if (S_ISDIR(mdd_object_type(obj))) {
297                         mdo_ref_del(env, obj, th);
298                         mdd_orphan_ref_del(env, mdd, th);
299                 }
300                 obj->mod_flags &= ~ORPHAN_OBJ;
301         } else {
302                 CERROR("could not delete object: rc = %d\n",rc);
303         }
304
305         mdd_orphan_write_unlock(env, mdd);
306         RETURN(rc);
307 }
308
309
310 static int orphan_object_destroy(const struct lu_env *env,
311                                  struct mdd_object *obj,
312                                  struct dt_key *key)
313 {
314         struct thandle *th = NULL;
315         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
316         int rc = 0;
317         ENTRY;
318
319         mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
320         th = mdd_trans_start(env, mdd);
321         if (IS_ERR(th)) {
322                 CERROR("Cannot get thandle\n");
323                 RETURN(-ENOMEM);
324         }
325
326         mdd_write_lock(env, obj, MOR_TGT_CHILD);
327         if (likely(obj->mod_count == 0)) {
328                 mdd_orphan_write_lock(env, mdd);
329                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
330                 if (!rc)
331                         orphan_object_kill(env, obj, mdd, th);
332                 else
333                         CERROR("could not delete object: rc = %d\n",rc);
334                 mdd_orphan_write_unlock(env, mdd);
335         }
336         mdd_write_unlock(env, obj);
337         mdd_trans_stop(env, mdd, 0, th);
338
339         RETURN(rc);
340 }
341
342 static int orph_key_test_and_del(const struct lu_env *env,
343                                  struct mdd_device *mdd,
344                                  struct lu_fid *lf,
345                                  struct dt_key *key)
346 {
347         struct mdd_object *mdo;
348         int rc;
349
350         mdo = mdd_object_find(env, mdd, lf);
351
352         if (IS_ERR(mdo))
353                 return PTR_ERR(mdo);
354
355         rc = -EBUSY;
356         if (mdo->mod_count == 0) {
357                 CWARN("Found orphan! Delete it\n");
358                 rc = orphan_object_destroy(env, mdo, key);
359         } else {
360                 mdd_write_lock(env, mdo, MOR_TGT_CHILD);
361                 if (likely(mdo->mod_count > 0)) {
362                         CDEBUG(D_HA, "Found orphan, open count = %d\n",
363                                mdo->mod_count);
364                         mdo->mod_flags |= ORPHAN_OBJ;
365                 }
366                 mdd_write_unlock(env, mdo);
367         }
368
369         mdd_object_put(env, mdo);
370         return rc;
371 }
372
373 static int orph_index_iterate(const struct lu_env *env,
374                               struct mdd_device *mdd)
375 {
376         struct dt_object *dor = mdd->mdd_orphans;
377         char             *mti_key = mdd_env_info(env)->mti_orph_key;
378         const struct dt_it_ops *iops;
379         struct dt_it     *it;
380         char             *key;
381         struct lu_fid     fid;
382         int               result = 0;
383         int               key_sz = 0;
384         int               rc;
385         __u64             cookie;
386         ENTRY;
387
388         /* In recovery phase, do not need for any lock here */
389
390         iops = &dor->do_index_ops->dio_it;
391         it = iops->init(env, dor, BYPASS_CAPA);
392         if (it != NULL) {
393                 result = iops->load(env, it, 0);
394                 if (result > 0) {
395                         /* main cycle */
396                         do {
397
398                                 key = (void *)iops->key(env, it);
399                                 if (IS_ERR(key)) {
400                                         CERROR("key failed when clean pending.\n");
401                                         goto next;
402                                 }
403                                 key_sz = iops->key_size(env, it);
404
405                                 /* filter out "." and ".." entries from
406                                  * PENDING dir. */
407                                 if (key_sz < 8)
408                                         goto next;
409
410                                 memcpy(mti_key, key, key_sz);
411                                 mti_key[key_sz] = 0;
412
413                                 if (orphan_key_to_fid(mti_key, &fid))
414                                         goto next;
415                                 if (!fid_is_sane(&fid)) {
416                                         CERROR("fid is not sane when clean pending.\n");
417                                         goto next;
418                                 }
419
420                                 /* kill orphan object */
421                                 cookie =  iops->store(env, it);
422                                 iops->put(env, it);
423                                 rc = orph_key_test_and_del(env, mdd, &fid,
424                                                 (struct dt_key *)mti_key);
425
426                                 /* after index delete reset iterator */
427                                 if (!rc)
428                                         result = iops->get(env, it,
429                                                            (const void *)"");
430                                 else
431                                         result = iops->load(env, it, cookie);
432 next:
433                                 result = iops->next(env, it);
434                         } while (result == 0);
435                         result = 0;
436                 } else if (result == 0) {
437                         CERROR("Input/Output for clean pending.\n");
438                         /* Index contains no zero key? */
439                         result = -EIO;
440                 }
441                 iops->put(env, it);
442                 iops->fini(env, it);
443         } else {
444                 CERROR("not enough memory for clean pending.\n");
445                 result = -ENOMEM;
446         }
447
448         RETURN(result);
449 }
450
451 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
452 {
453         struct lu_fid fid;
454         struct dt_object *d;
455         int rc = 0;
456         ENTRY;
457
458         d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
459         if (!IS_ERR(d)) {
460                 mdd->mdd_orphans = d;
461                 if (!dt_try_as_dir(env, d)) {
462                         rc = -ENOTDIR;
463                         CERROR("\"%s\" is not an index! : rc = %d\n",
464                                         orph_index_name, rc);
465                 }
466         } else {
467                 CERROR("cannot find \"%s\" obj %d\n",
468                        orph_index_name, (int)PTR_ERR(d));
469                 rc = PTR_ERR(d);
470         }
471
472         RETURN(rc);
473 }
474
475 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
476 {
477         ENTRY;
478         if (mdd->mdd_orphans != NULL) {
479                 lu_object_put(env, &mdd->mdd_orphans->do_lu);
480                 mdd->mdd_orphans = NULL;
481         }
482         EXIT;
483 }
484
485 /**
486  *  Iterate orphan index to cleanup orphan objects in case of recovery.
487  *  \param d   mdd device in recovery.
488  *
489  */
490
491 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
492 {
493         return orph_index_iterate(env, d);
494 }
495
496 /**
497  *  delete an orphan \a obj from orphan index.
498  *  \param obj file or directory.
499  *  \param th  transaction for index insert.
500  *
501  *  \pre obj nlink == 0 && obj->mod_count != 0
502  *
503  *  \retval 0  success
504  *  \retval  -ve index operation error.
505  */
506
507 int __mdd_orphan_add(const struct lu_env *env,
508                      struct mdd_object *obj, struct thandle *th)
509 {
510         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
511 }
512
513 /**
514  *  delete an orphan \a obj from orphan index.
515  *  \param obj file or directory.
516  *  \param th  transaction for index deletion and object destruction.
517  *
518  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
519  *
520  *  \retval 0  success
521  *  \retval  -ve index operation error.
522  */
523
524 int __mdd_orphan_del(const struct lu_env *env,
525                      struct mdd_object *obj, struct thandle *th)
526 {
527         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
528 }