Whamcloud - gitweb
f4ea74b6aeece3765e38253b6dcdde029c81f18d
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_orphans.c
37  *
38  * Orphan handling code
39  *
40  * Author: Mike Pershin <tappro@clusterfs.com>
41  *         Pravin B Shelar <pravin.shelar@sun.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <obd.h>
50 #include <obd_class.h>
51 #include <lustre_ver.h>
52 #include <obd_support.h>
53 #include <lustre_fid.h>
54 #include "mdd_internal.h"
55
56 const char orph_index_name[] = "PENDING";
57
58 enum {
59         ORPH_OP_UNLINK,
60         ORPH_OP_TRUNCATE
61 };
62
63 #define ORPHAN_FILE_NAME_FORMAT         "%016llx:%08x:%08x:%2x"
64 #define ORPHAN_FILE_NAME_FORMAT_18      "%llx:%08x"
65
66 static struct dt_key* orph_key_fill(const struct lu_env *env,
67                                     const struct lu_fid *lf, __u32 op)
68 {
69         char *key = mdd_env_info(env)->mti_orph_key;
70         int rc;
71
72         LASSERT(key);
73         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT, fid_seq(lf),
74                       fid_oid(lf), fid_ver(lf), op);
75         if (rc > 0)
76                 return (struct dt_key*) key;
77         else
78                 return ERR_PTR(rc);
79 }
80
81 static struct dt_key* orph_key_fill_18(const struct lu_env *env,
82                                        const struct lu_fid *lf)
83 {
84         char *key = mdd_env_info(env)->mti_orph_key;
85         int rc;
86
87         LASSERT(key);
88         rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18, fid_seq(lf),
89                       fid_oid(lf));
90         if (rc > 0)
91                 return (struct dt_key*) key;
92         else
93                 return ERR_PTR(rc);
94 }
95
96 static int orphan_key_to_fid(char *key, struct lu_fid *lf)
97 {
98         int rc = 0;
99         unsigned int op;
100
101         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT, &lf->f_seq, &lf->f_oid,
102                     &lf->f_ver, &op);
103         if (rc == 4)
104                 return 0;
105
106         /* build igif */
107         rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
108                     &lf->f_seq, &lf->f_oid);
109         if (rc == 2) {
110                 lf->f_ver = 0;
111                 return 0;
112         }
113
114         CERROR("can not parse orphan file name %s\n",key);
115         return -EINVAL;
116 }
117
118 static inline void mdd_orphan_write_lock(const struct lu_env *env,
119                                     struct mdd_device *mdd)
120 {
121
122         struct dt_object        *dor    = mdd->mdd_orphans;
123         dor->do_ops->do_write_lock(env, dor, MOR_TGT_ORPHAN);
124 }
125
126 static inline void mdd_orphan_write_unlock(const struct lu_env *env,
127                                            struct mdd_device *mdd)
128 {
129
130         struct dt_object        *dor    = mdd->mdd_orphans;
131         dor->do_ops->do_write_unlock(env, dor);
132 }
133
134 static inline int mdd_orphan_insert_obj(const struct lu_env *env,
135                                         struct mdd_device *mdd,
136                                         struct mdd_object *obj,
137                                         __u32 op,
138                                         struct thandle *th)
139 {
140         struct dt_object        *dor    = mdd->mdd_orphans;
141         const struct lu_fid     *lf     = mdo2fid(obj);
142         struct dt_key           *key    = orph_key_fill(env, lf, op);
143         ENTRY;
144
145         return  dor->do_index_ops->dio_insert(env, dor,
146                                               __mdd_fid_rec(env, lf),
147                                               key, th,
148                                               BYPASS_CAPA, 1);
149 }
150
151 static inline int mdd_orphan_delete_obj(const struct lu_env *env,
152                                         struct mdd_device  *mdd ,
153                                         struct dt_key *key,
154                                         struct thandle *th)
155 {
156         struct dt_object        *dor    = mdd->mdd_orphans;
157
158         return  dor->do_index_ops->dio_delete(env, dor,
159                                               key, th,
160                                               BYPASS_CAPA);
161 }
162
163 static inline void mdd_orphan_ref_add(const struct lu_env *env,
164                                  struct mdd_device *mdd,
165                                  struct thandle *th)
166 {
167         struct dt_object        *dor    = mdd->mdd_orphans;
168         dor->do_ops->do_ref_add(env, dor, th);
169 }
170
171 static inline void mdd_orphan_ref_del(const struct lu_env *env,
172                                  struct mdd_device *mdd,
173                                  struct thandle *th)
174 {
175         struct dt_object        *dor    = mdd->mdd_orphans;
176         dor->do_ops->do_ref_del(env, dor, th);
177 }
178
179
180 static int orph_index_insert(const struct lu_env *env,
181                              struct mdd_object *obj,
182                              __u32 op,
183                              struct thandle *th)
184 {
185         struct mdd_device       *mdd    = mdo2mdd(&obj->mod_obj);
186         struct dt_object        *dor    = mdd->mdd_orphans;
187         const struct lu_fid     *lf_dor = lu_object_fid(&dor->do_lu);
188         struct dt_object        *next   = mdd_object_child(obj);
189         const struct dt_key     *dotdot = (const struct dt_key *) "..";
190         int rc;
191         ENTRY;
192
193         mdd_orphan_write_lock(env, mdd);
194
195         rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
196         if (rc)
197                 GOTO(out, rc);
198
199         mdo_ref_add(env, obj, th);
200         if (!S_ISDIR(mdd_object_type(obj)))
201                 goto out;
202
203         mdo_ref_add(env, obj, th);
204         mdd_orphan_ref_add(env, mdd, th);
205
206         /* try best to fixup directory, dont return errors
207          * from here */
208         if (!dt_try_as_dir(env, next))
209                 goto out;
210         next->do_index_ops->dio_delete(env, next,
211                                        dotdot, th, BYPASS_CAPA);
212
213         next->do_index_ops->dio_insert(env, next,
214                                        __mdd_fid_rec(env, lf_dor),
215                                        dotdot, th, BYPASS_CAPA, 1);
216
217 out:
218         mdd_orphan_write_unlock(env, mdd);
219
220         RETURN(rc);
221 }
222
223 /**
224  * destroy osd object on mdd and associated ost objects.
225  *
226  * \param obj orphan object
227  * \param mdd used for sending llog msg to osts
228  *
229  * \retval  0   success
230  * \retval -ve  error
231  */
232 static int orphan_object_kill(const struct lu_env *env,
233                               struct mdd_object *obj,
234                               struct mdd_device *mdd,
235                               struct thandle *th)
236 {
237         struct lu_attr *la = &mdd_env_info(env)->mti_la;
238         int rc;
239
240         /* No need to lock this object as its recovery phase, and
241          * no other thread can access it. But we need to lock it
242          * as its precondition for osd api we using. */
243
244         mdd_write_lock(env, obj, MOR_TGT_CHILD);
245         mdo_ref_del(env, obj, th);
246         if (S_ISDIR(mdd_object_type(obj))) {
247                 mdo_ref_del(env, obj, th);
248                 mdd_orphan_ref_del(env, mdd, th);
249                 mdd_write_unlock(env, obj);
250         } else {
251                 /* regular file , cleanup linked ost objects */
252                 rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
253                 mdd_write_unlock(env, obj);
254                 if (rc)
255                         RETURN(rc);
256
257                 mdd_lov_destroy(env, mdd, obj, la);
258         }
259         return 0;
260 }
261
262 static int orph_index_delete(const struct lu_env *env,
263                              struct mdd_object *obj,
264                              __u32 op,
265                              struct thandle *th)
266 {
267         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
268         struct dt_object *dor = mdd->mdd_orphans;
269         struct dt_key *key;
270         int rc;
271
272         ENTRY;
273
274         LASSERT(dor);
275
276         key = orph_key_fill(env, mdo2fid(obj), op);
277         mdd_orphan_write_lock(env, mdd);
278
279         rc = mdd_orphan_delete_obj(env, mdd, key, th);
280
281         if (rc == -ENOENT) {
282                 key = orph_key_fill_18(env, mdo2fid(obj));
283                 rc = mdd_orphan_delete_obj(env, mdd, key, th);
284         }
285
286         if (!rc) {
287                 /* lov objects will be destroyed by caller */
288                 mdo_ref_del(env, obj, th);
289                 if (S_ISDIR(mdd_object_type(obj))) {
290                         mdo_ref_del(env, obj, th);
291                         mdd_orphan_ref_del(env, mdd, th);
292                 }
293         } else
294                 CERROR("could not delete object: rc = %d\n",rc);
295
296         obj->mod_flags &= ~ORPHAN_OBJ;
297         mdd_orphan_write_unlock(env, mdd);
298         RETURN(rc);
299 }
300
301
302 static int orphan_object_destroy(const struct lu_env *env,
303                                  struct mdd_object *obj,
304                                  struct dt_key *key)
305 {
306         struct thandle *th = NULL;
307         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
308         int rc;
309         ENTRY;
310
311         mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
312         th = mdd_trans_start(env, mdd);
313         if (IS_ERR(th)) {
314                 CERROR("Cannot get thandle\n");
315                 RETURN(-ENOMEM);
316         }
317
318         mdd_orphan_write_lock(env, mdd);
319         rc = mdd_orphan_delete_obj(env, mdd, key, th);
320         if (!rc)
321                 orphan_object_kill(env, obj, mdd, th);
322         else
323                 CERROR("could not delete object: rc = %d\n",rc);
324
325         mdd_orphan_write_unlock(env, mdd);
326         mdd_trans_stop(env, mdd, 0, th);
327
328         RETURN(rc);
329 }
330
331 static int orph_key_test_and_del(const struct lu_env *env,
332                                  struct mdd_device *mdd,
333                                  struct lu_fid *lf,
334                                  struct dt_key *key)
335 {
336         struct mdd_object *mdo;
337         int rc;
338
339         mdo = mdd_object_find(env, mdd, lf);
340
341         if (IS_ERR(mdo))
342                 return PTR_ERR(mdo);
343
344         rc = -EBUSY;
345         if (mdo->mod_count == 0) {
346                 CWARN("Found orphan! Delete it\n");
347                 rc = orphan_object_destroy(env, mdo, key);
348         } else {
349                 CDEBUG(D_HA, "Found orphan, open count = %d\n", mdo->mod_count);
350                 mdo->mod_flags |= ORPHAN_OBJ;
351         }
352
353         mdd_object_put(env, mdo);
354         return rc;
355 }
356
357 static int orph_index_iterate(const struct lu_env *env,
358                               struct mdd_device *mdd)
359 {
360         struct dt_object *dor = mdd->mdd_orphans;
361         char             *mti_key = mdd_env_info(env)->mti_orph_key;
362         const struct dt_it_ops *iops;
363         struct dt_it     *it;
364         char             *key;
365         struct lu_fid     fid;
366         int               result = 0;
367         int               key_sz = 0;
368         int               rc;
369         __u64             cookie;
370         ENTRY;
371
372         /* In recovery phase, do not need for any lock here */
373
374         iops = &dor->do_index_ops->dio_it;
375         it = iops->init(env, dor, BYPASS_CAPA);
376         if (it != NULL) {
377                 result = iops->load(env, it, 0);
378                 if (result > 0) {
379                         /* main cycle */
380                         do {
381
382                                 key = (void *)iops->key(env, it);
383                                 if (IS_ERR(key))
384                                         goto next;
385                                 key_sz = iops->key_size(env, it);
386
387                                 /* filter out "." and ".." entries from
388                                  * PENDING dir. */
389                                 if (key_sz < 8)
390                                         goto next;
391
392                                 memcpy(mti_key, key, key_sz);
393                                 mti_key[key_sz] = 0;
394
395                                 if (orphan_key_to_fid(mti_key, &fid))
396                                         goto next;
397                                 if (!fid_is_sane(&fid))
398                                         goto next;
399
400                                 /* kill orphan object */
401                                 cookie =  iops->store(env, it);
402                                 iops->put(env, it);
403                                 rc = orph_key_test_and_del(env, mdd, &fid,
404                                                 (struct dt_key *)mti_key);
405
406                                 /* after index delete reset iterator */
407                                 if (!rc)
408                                         result = iops->get(env, it,
409                                                            (const void *)"");
410                                 else
411                                         result = iops->load(env, it, cookie);
412 next:
413                                 result = iops->next(env, it);
414                         } while (result == 0);
415                         result = 0;
416                 } else if (result == 0)
417                         /* Index contains no zero key? */
418                         result = -EIO;
419                 iops->put(env, it);
420                 iops->fini(env, it);
421         } else
422                 result = -ENOMEM;
423
424         RETURN(result);
425 }
426
427 int orph_index_init(const struct lu_env *env, struct mdd_device *mdd)
428 {
429         struct lu_fid fid;
430         struct dt_object *d;
431         int rc = 0;
432         ENTRY;
433
434         d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
435         if (!IS_ERR(d)) {
436                 mdd->mdd_orphans = d;
437                 if (!dt_try_as_dir(env, d)) {
438                         rc = -ENOTDIR;
439                         CERROR("\"%s\" is not an index! : rc = %d\n",
440                                         orph_index_name, rc);
441                 }
442         } else {
443                 CERROR("cannot find \"%s\" obj %d\n",
444                        orph_index_name, (int)PTR_ERR(d));
445                 rc = PTR_ERR(d);
446         }
447
448         RETURN(rc);
449 }
450
451 void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd)
452 {
453         ENTRY;
454         if (mdd->mdd_orphans != NULL) {
455                 lu_object_put(env, &mdd->mdd_orphans->do_lu);
456                 mdd->mdd_orphans = NULL;
457         }
458         EXIT;
459 }
460
461 /**
462  *  Iterate orphan index to cleanup orphan objects in case of recovery.
463  *  \param d   mdd device in recovery.
464  *
465  */
466
467 int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
468 {
469         return orph_index_iterate(env, d);
470 }
471
472 /**
473  *  delete an orphan \a obj from orphan index.
474  *  \param obj file or directory.
475  *  \param th  transaction for index insert.
476  *
477  *  \pre obj nlink == 0 && obj->mod_count != 0
478  *
479  *  \retval 0  success
480  *  \retva  -ve index operation error.
481  */
482
483 int __mdd_orphan_add(const struct lu_env *env,
484                      struct mdd_object *obj, struct thandle *th)
485 {
486         return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
487 }
488
489 /**
490  *  delete an orphan \a obj from orphan index.
491  *  \param obj file or directory.
492  *  \param th  transaction for index deletion and object destruction.
493  *
494  *  \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
495  *
496  *  \retval 0  success
497  *  \retva  -ve index operation error.
498  */
499
500 int __mdd_orphan_del(const struct lu_env *env,
501                      struct mdd_object *obj, struct thandle *th)
502 {
503         return orph_index_delete(env, obj, ORPH_OP_UNLINK, th);
504 }