Whamcloud - gitweb
proper locking for the mod_count, remove assert for transaction while write_lock
[fs/lustre-release.git] / lustre / mdd / mdd_orphans.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_orphans.c
5  *
6  *  Orphan handling code
7  *
8  *  Copyright (C) 2006 Cluster File Systems, Inc.
9  *   Author: Mike Pershin <tappro@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <obd.h>
35 #include <obd_class.h>
36 #include <lustre_ver.h>
37 #include <obd_support.h>
38
39 #include "mdd_internal.h"
40
41 const char orph_index_name[] = "orphans";
42
43 static const struct dt_index_features orph_index_features = {
44         .dif_flags       = DT_IND_UPDATE,
45         .dif_keysize_min = sizeof(struct orph_key),
46         .dif_keysize_max = sizeof(struct orph_key),
47         .dif_recsize_min = sizeof(loff_t),
48         .dif_recsize_max = sizeof(loff_t)
49 };
50
51 enum {
52         ORPH_OP_UNLINK,
53         ORPH_OP_TRUNCATE
54 };
55
56 static struct orph_key *orph_key_fill(const struct lu_context *ctx,
57                                       const struct lu_fid *lf, __u32 op)
58 {
59         struct orph_key *key = &mdd_ctx_info(ctx)->mti_orph_key;
60         LASSERT(key);
61         key->ok_fid.f_seq = cpu_to_be64(fid_seq(lf));
62         key->ok_fid.f_oid = cpu_to_be32(fid_oid(lf));
63         key->ok_fid.f_ver = cpu_to_be32(fid_ver(lf));
64         key->ok_op = cpu_to_be32(op);
65         return key;
66 }
67
68 static int orph_index_insert(const struct lu_context *ctx, 
69                              struct mdd_object *obj, __u32 op,
70                              loff_t *offset, struct thandle *th)
71 {
72         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
73         struct dt_object *dor = mdd->mdd_orphans;
74         struct orph_key *key = orph_key_fill(ctx, mdo2fid(obj), op);
75         int rc;
76         ENTRY;
77
78         rc = dor->do_index_ops->dio_insert(ctx, dor, (struct dt_rec *)offset,
79                                            (struct dt_key *)key, th);
80         RETURN(rc);
81 }
82
83 static int orph_index_delete(const struct lu_context *ctx, 
84                              struct mdd_object *obj, __u32 op,
85                              struct thandle *th)
86 {
87         struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
88         struct dt_object *dor = mdd->mdd_orphans;
89         struct orph_key *key = orph_key_fill(ctx, mdo2fid(obj), op);
90         int rc;
91         ENTRY;
92
93         rc = dor->do_index_ops->dio_delete(ctx, dor,
94                                            (struct dt_key *)key, th);
95         RETURN(rc);
96
97 }
98
99 static inline struct orph_key *orph_key_empty(const struct lu_context *ctx,
100                                               __u32 op)
101 {
102         struct orph_key *key = &mdd_ctx_info(ctx)->mti_orph_key;
103         LASSERT(key);
104         key->ok_fid.f_seq = 0;
105         key->ok_fid.f_oid = 0;
106         key->ok_fid.f_ver = 0;
107         key->ok_op = cpu_to_be32(op);
108         return key;
109 }
110
111 static void orph_key_test_and_del(const struct lu_context *ctx,
112                                   struct mdd_device *mdd,
113                                   const struct orph_key *key)
114 {
115         struct mdd_object *mdo;
116
117         mdo = mdd_object_find(ctx, mdd, &key->ok_fid);
118         if (IS_ERR(mdo))
119                 CERROR("Invalid orphan!\n");
120         else {
121                 mdd_write_lock(ctx, mdo);
122                 if (mdo->mod_count == 0) {
123                         /* non-opened orphan, let's delete it */
124                         struct md_attr *ma = &mdd_ctx_info(ctx)->mti_ma;
125                         __mdd_object_kill(ctx, mdo, ma);
126                         /* TODO: now handle OST objects */
127                         //mdd_ost_objects_destroy(ctx, ma);
128                         /* TODO: destroy index entry */
129                 }
130                 mdd_write_unlock(ctx, mdo);
131                 mdd_object_put(ctx, mdo);
132         }        
133 }
134
135 static int orph_index_iterate(const struct lu_context *ctx,
136                               struct mdd_device *mdd)
137 {
138         struct dt_object *dt_obj = mdd->mdd_orphans;
139         struct dt_it     *it;
140         struct dt_it_ops *iops;
141         struct orph_key  *key = orph_key_empty(ctx, 0);
142         int result;
143         ENTRY;
144
145         iops = &dt_obj->do_index_ops->dio_it;
146         it = iops->init(ctx, dt_obj, 1);
147         if (it != NULL) {
148                 result = iops->get(ctx, it, (const void *)key);
149                 if (result > 0) {
150                         int i;
151                         /* main cycle */
152                         for (result = 0, i = 0; result == +1; ++i) {
153                                 key = (void *)iops->key(ctx, it);
154                                 orph_key_test_and_del(ctx, mdd, key);
155                                 result = iops->next(ctx, it);
156                         }
157                         iops->put(ctx, it);
158                 } else if (result == 0)
159                         /* Index contains no zero key? */
160                         result = -EIO;
161                 iops->fini(ctx, it);
162         } else
163                 result = -ENOMEM;
164
165         RETURN(result);
166 }
167
168 int orph_index_init(const struct lu_context *ctx, struct mdd_device *mdd)
169 {
170         struct lu_fid fid;
171         struct dt_object *d;
172         int rc;
173         ENTRY;
174
175         d = dt_store_open(ctx, mdd->mdd_child, orph_index_name, &fid);
176         if (!IS_ERR(d)) {
177                 mdd->mdd_orphans = d;
178                 rc = d->do_ops->do_index_try(ctx, d, &orph_index_features);
179                 if (rc == 0)
180                         LASSERT(d->do_index_ops != NULL);
181                 else
182                         CERROR("\"%s\" is not an index!\n", orph_index_name);
183         } else {
184                 CERROR("cannot find \"%s\" obj %d\n",
185                        orph_index_name, (int)PTR_ERR(d));
186                 rc = PTR_ERR(d);
187         }
188
189         RETURN(rc);
190 }
191
192 void orph_index_fini(const struct lu_context *ctx, struct mdd_device *mdd)
193 {
194         ENTRY;
195         if (mdd->mdd_orphans != NULL) {
196                 if (!IS_ERR(mdd->mdd_orphans))
197                         lu_object_put(ctx, &mdd->mdd_orphans->do_lu);
198                 mdd->mdd_orphans = NULL;
199         }
200         EXIT;
201 }
202
203 int __mdd_orphan_add(const struct lu_context *ctx,
204                             struct mdd_object *obj,
205                             struct thandle *th)
206 {
207         loff_t offset = 0;
208         return orph_index_insert(ctx, obj, ORPH_OP_UNLINK, &offset, th);
209 }
210
211 int __mdd_orphan_del(const struct lu_context *ctx,
212                             struct mdd_object *obj,
213                             struct thandle *th)
214 {
215         return orph_index_delete(ctx, obj, ORPH_OP_UNLINK, th);
216 }
217
218