Whamcloud - gitweb
ac6744f30fe8a9095d7892a9d6c35b7b0ea3d7ba
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_oi.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd/osd_oi.c
39  *
40  * Object Index.
41  *
42  * Author: Nikita Danilov <nikita@clusterfs.com>
43  */
44
45 /*
46  * oi uses two mechanisms to implement fid->cookie mapping:
47  *
48  *     - persistent index, where cookie is a record and fid is a key, and
49  *
50  *     - algorithmic mapping for "igif" fids.
51  *
52  */
53
54 #ifndef EXPORT_SYMTAB
55 # define EXPORT_SYMTAB
56 #endif
57 #define DEBUG_SUBSYSTEM S_MDS
58
59 #include <linux/module.h>
60
61 /* LUSTRE_VERSION_CODE */
62 #include <lustre_ver.h>
63 /*
64  * struct OBD_{ALLOC,FREE}*()
65  * OBD_FAIL_CHECK
66  */
67 #include <obd.h>
68 #include <obd_support.h>
69
70 /* fid_cpu_to_be() */
71 #include <lustre_fid.h>
72
73 #include "osd_oi.h"
74 /* osd_lookup(), struct osd_thread_info */
75 #include "osd_internal.h"
76 #include "osd_igif.h"
77 #include "dt_object.h"
78
79 #define OSD_OI_FID_NR         (1UL << OSD_OI_FID_OID_BITS)
80 #define OSD_OI_FID_NR_MAX     (1UL << OSD_OI_FID_OID_BITS_MAX)
81
82 static unsigned int osd_oi_count = OSD_OI_FID_NR;
83 CFS_MODULE_PARM(osd_oi_count, "i", int, 0444,
84                 "Number of Object Index containers to be created, "
85                 "it's only valid for new filesystem.");
86
87 /** to serialize concurrent OI index initialization */
88 static cfs_mutex_t oi_init_lock;
89
90 static struct dt_index_features oi_feat = {
91         .dif_flags       = DT_IND_UPDATE,
92         .dif_recsize_min = sizeof(struct osd_inode_id),
93         .dif_recsize_max = sizeof(struct osd_inode_id),
94         .dif_ptrsize     = 4
95 };
96
97 #define OSD_OI_NAME_BASE        "oi.16"
98
99 /**
100  * Open an OI(Ojbect Index) container.
101  *
102  * \param       name    Name of OI container
103  * \param       objp    Pointer of returned OI
104  *
105  * \retval      0       success
106  * \retval      -ve     failure
107  */
108 static int
109 osd_oi_open(struct osd_thread_info *info,
110             struct dt_device *dev, char *name, struct dt_object **objp)
111 {
112         const struct lu_env *env = info->oti_env;
113         struct dt_object    *obj;
114         int                  rc;
115
116         obj = dt_store_open(env, dev, "", name, &info->oti_fid);
117         if (IS_ERR(obj))
118                 return PTR_ERR(obj);
119
120         oi_feat.dif_keysize_min = sizeof(info->oti_fid);
121         oi_feat.dif_keysize_max = sizeof(info->oti_fid);
122
123         rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
124         if (rc != 0) {
125                 lu_object_put(info->oti_env, &obj->do_lu);
126                 CERROR("%s: wrong index %s: rc = %d\n",
127                        dev->dd_lu_dev.ld_obd->obd_name, name, rc);
128                 return rc;
129         }
130
131         *objp = obj;
132         return 0;
133 }
134
135
136 static void
137 osd_oi_table_put(struct osd_thread_info *info,
138                  struct osd_oi *oi_table, unsigned oi_count)
139 {
140         int     i;
141
142         for (i = 0; i < oi_count; i++) {
143                 LASSERT(oi_table[i].oi_dir != NULL);
144
145                 lu_object_put(info->oti_env, &oi_table[i].oi_dir->do_lu);
146                 oi_table[i].oi_dir = NULL;
147         }
148 }
149
150 /**
151  * Open OI(Object Index) table.
152  * If \a oi_count is zero, which means caller doesn't know how many OIs there
153  * will be, this function can either return 0 for new filesystem, or number
154  * of OIs on existed filesystem.
155  *
156  * If \a oi_count is non-zero, which means caller does know number of OIs on
157  * filesystem, this function should return the exactly same number on
158  * success, or error code in failure.
159  *
160  * \param     oi_count  Number of expected OI containers
161  * \param     try_all   Try to open all OIs even see failures
162  *
163  * \retval    +ve       number of opened OI containers
164  * \retval      0       no OI containers found
165  * \retval    -ve       failure
166  */
167 static int
168 osd_oi_table_open(struct osd_thread_info *info, struct dt_device *dev,
169                   struct osd_oi *oi_table, unsigned oi_count, int try_all)
170 {
171         int     count = 0;
172         int     rc = 0;
173         int     i;
174
175         /* NB: oi_count != 0 means that we have already created/known all OIs
176          * and have known exact number of OIs. */
177         LASSERT(oi_count <= OSD_OI_FID_NR_MAX);
178
179         for (i = 0; i < (oi_count != 0 ? oi_count : OSD_OI_FID_NR_MAX); i++) {
180                 char name[12];
181
182                 sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
183                 rc = osd_oi_open(info, dev, name, &oi_table[i].oi_dir);
184                 if (rc == 0) {
185                         count++;
186                         continue;
187                 }
188
189                 if (try_all)
190                         continue;
191
192                 if (rc == -ENOENT && oi_count == 0)
193                         return count;
194
195                 CERROR("%s: can't open %s: rc = %d\n",
196                        dev->dd_lu_dev.ld_obd->obd_name, name, rc);
197
198                 if (oi_count > 0) {
199                         CERROR("%s: expect to open total %d OI files.\n",
200                                dev->dd_lu_dev.ld_obd->obd_name, oi_count);
201                 }
202
203                 break;
204         }
205
206         if (try_all)
207                 return count;
208
209         if (rc < 0) {
210                 osd_oi_table_put(info, oi_table, count);
211                 return rc;
212         }
213
214         return count;
215 }
216
217 static int osd_oi_table_create(struct osd_thread_info *info,
218                                struct dt_device *dev,
219                                struct md_device *mdev, int oi_count)
220 {
221         const struct lu_env *env;
222         struct md_object *mdo;
223         int i;
224
225         env = info->oti_env;
226         for (i = 0; i < oi_count; ++i) {
227                 char name[12];
228
229                 sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
230
231                 lu_local_obj_fid(&info->oti_fid, OSD_OI_FID_OID_FIRST + i);
232                 oi_feat.dif_keysize_min = sizeof(info->oti_fid);
233                 oi_feat.dif_keysize_max = sizeof(info->oti_fid);
234
235                 mdo = llo_store_create_index(env, mdev, dev, "", name,
236                                              &info->oti_fid, &oi_feat);
237                 if (IS_ERR(mdo)) {
238                         CERROR("Failed to create OI[%d] on %s: %d\n",
239                                i, dev->dd_lu_dev.ld_obd->obd_name,
240                                (int)PTR_ERR(mdo));
241                         RETURN(PTR_ERR(mdo));
242                 }
243
244                 lu_object_put(env, &mdo->mo_lu);
245         }
246         return 0;
247 }
248
249 int osd_oi_init(struct osd_thread_info *info,
250                 struct osd_oi **oi_table,
251                 struct dt_device *dev,
252                 struct md_device *mdev)
253 {
254         struct osd_oi *oi;
255         int rc;
256
257         OBD_ALLOC(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
258         if (oi == NULL)
259                 return -ENOMEM;
260
261         cfs_mutex_lock(&oi_init_lock);
262
263         rc = osd_oi_table_open(info, dev, oi, 0, 0);
264         if (rc != 0)
265                 goto out;
266
267         rc = osd_oi_open(info, dev, OSD_OI_NAME_BASE, &oi[0].oi_dir);
268         if (rc == 0) { /* found single OI from old filesystem */
269                 rc = 1;
270                 goto out;
271         }
272
273         if (rc != -ENOENT) {
274                 CERROR("%s: can't open %s: rc = %d\n",
275                        dev->dd_lu_dev.ld_obd->obd_name, OSD_OI_NAME_BASE, rc);
276                 goto out;
277         }
278
279         /* create OI objects */
280         rc = osd_oi_table_create(info, dev, mdev, osd_oi_count);
281         if (rc != 0)
282                 goto out;
283
284         rc = osd_oi_table_open(info, dev, oi, osd_oi_count, 0);
285         LASSERT(rc == osd_oi_count || rc < 0);
286
287  out:
288         if (rc < 0) {
289                 OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
290         } else {
291                 LASSERT((rc & (rc - 1)) == 0);
292                 *oi_table = oi;
293         }
294
295         cfs_mutex_unlock(&oi_init_lock);
296         return rc;
297 }
298
299 void osd_oi_fini(struct osd_thread_info *info,
300                  struct osd_oi **oi_table, unsigned oi_count)
301 {
302         struct osd_oi *oi = *oi_table;
303
304         osd_oi_table_put(info, oi, oi_count);
305
306         OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
307         *oi_table = NULL;
308 }
309
310 int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
311                   const struct lu_fid *fid, struct osd_inode_id *id)
312 {
313         struct lu_fid *oi_fid = &info->oti_fid;
314         int rc;
315
316         if (osd_fid_is_igif(fid)) {
317                 lu_igif_to_id(fid, id);
318                 rc = 0;
319         } else {
320                 struct dt_object    *idx;
321                 const struct dt_key *key;
322
323                 if (!fid_is_norm(fid))
324                         return -ENOENT;
325
326                 idx = oi->oi_dir;
327                 fid_cpu_to_be(oi_fid, fid);
328                 key = (struct dt_key *) oi_fid;
329                 rc = idx->do_index_ops->dio_lookup(info->oti_env, idx,
330                                                    (struct dt_rec *)id, key,
331                                                    BYPASS_CAPA);
332                 if (rc > 0) {
333                         id->oii_ino = be32_to_cpu(id->oii_ino);
334                         id->oii_gen = be32_to_cpu(id->oii_gen);
335                         rc = 0;
336                 } else if (rc == 0)
337                         rc = -ENOENT;
338         }
339         return rc;
340 }
341
342 int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi,
343                   const struct lu_fid *fid, const struct osd_inode_id *id0,
344                   struct thandle *th, int ignore_quota)
345 {
346         struct lu_fid *oi_fid = &info->oti_fid;
347         struct dt_object    *idx;
348         struct osd_inode_id *id;
349         const struct dt_key *key;
350
351         if (!fid_is_norm(fid))
352                 return 0;
353
354         idx = oi->oi_dir;
355         fid_cpu_to_be(oi_fid, fid);
356         key = (struct dt_key *) oi_fid;
357
358         id  = &info->oti_id;
359         id->oii_ino = cpu_to_be32(id0->oii_ino);
360         id->oii_gen = cpu_to_be32(id0->oii_gen);
361         return idx->do_index_ops->dio_insert(info->oti_env, idx,
362                                              (struct dt_rec *)id,
363                                              key, th, BYPASS_CAPA,
364                                              ignore_quota);
365 }
366
367 int osd_oi_delete(struct osd_thread_info *info,
368                   struct osd_oi *oi, const struct lu_fid *fid,
369                   struct thandle *th)
370 {
371         struct lu_fid *oi_fid = &info->oti_fid;
372         struct dt_object    *idx;
373         const struct dt_key *key;
374
375         if (!fid_is_norm(fid))
376                 return 0;
377
378         idx = oi->oi_dir;
379         fid_cpu_to_be(oi_fid, fid);
380         key = (struct dt_key *) oi_fid;
381         return idx->do_index_ops->dio_delete(info->oti_env, idx,
382                                              key, th, BYPASS_CAPA);
383 }
384
385 int osd_oi_mod_init()
386 {
387         if (osd_oi_count == 0 || osd_oi_count > OSD_OI_FID_NR_MAX)
388                 osd_oi_count = OSD_OI_FID_NR;
389
390         if ((osd_oi_count & (osd_oi_count - 1)) != 0) {
391                 LCONSOLE_WARN("Round up oi_count %d to power2 %d\n",
392                               osd_oi_count, size_roundup_power2(osd_oi_count));
393                 osd_oi_count = size_roundup_power2(osd_oi_count);
394         }
395
396         cfs_mutex_init(&oi_init_lock);
397         return 0;
398 }