Whamcloud - gitweb
LU-1303 osd: zfs-osd should initialize local objects on MDS
[fs/lustre-release.git] / lustre / osd-zfs / osd_oi.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2011, 2012 Whamcloud, Inc.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_oi.c
39  * OI functions to map fid to dnode
40  *
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mike Pershin <tappro@whamcloud.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_OSD
49
50 #include <lustre_ver.h>
51 #include <libcfs/libcfs.h>
52 #include <obd_support.h>
53 #include <lustre_net.h>
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_disk.h>
57 #include <lustre_fid.h>
58
59 #include "osd_internal.h"
60
61 #include <sys/dnode.h>
62 #include <sys/dbuf.h>
63 #include <sys/spa.h>
64 #include <sys/stat.h>
65 #include <sys/zap.h>
66 #include <sys/spa_impl.h>
67 #include <sys/zfs_znode.h>
68 #include <sys/dmu_tx.h>
69 #include <sys/dmu_objset.h>
70 #include <sys/dsl_prop.h>
71 #include <sys/sa_impl.h>
72 #include <sys/txg.h>
73
74 static char *oi_tag = "osd_mount, oi";
75
76 #define OSD_OI_FID_NR         (1UL << 7)
77 #define OSD_OI_FID_NR_MAX     (1UL << OSD_OI_FID_OID_BITS_MAX)
78 unsigned int osd_oi_count = OSD_OI_FID_NR;
79
80
81 /*
82  * zfs osd maintains names for known fids in the name hierarchy
83  * so that one can mount filesystem with regular ZFS stack and
84  * access files
85  */
86 struct named_oid {
87         unsigned long    oid;
88         char            *name;
89 };
90
91 static const struct named_oid oids[] = {
92         { OFD_LAST_RECV_OID,            LAST_RCVD },
93         { OFD_LAST_GROUP_OID,           "LAST_GROUP" },
94         { LLOG_CATALOGS_OID,            "CATALOGS" },
95         { MGS_CONFIGS_OID,              NULL /*MOUNT_CONFIGS_DIR*/ },
96         { FID_SEQ_SRV_OID,              NULL /* "seq_srv" */},
97         { FID_SEQ_CTL_OID,              NULL /*"seq_ctl"*/ },
98         { MDD_CAPA_KEYS_OID,            NULL /*CAPA_KEYS*/ },
99         { FLD_INDEX_OID,                NULL /* "fld" */ },
100         { MDD_LOV_OBJ_OID,              LOV_OBJID },
101         { MDT_LAST_RECV_OID,            LAST_RCVD },
102         { OFD_HEALTH_CHECK_OID,         HEALTH_CHECK },
103         { OFD_GROUP0_LAST_OID,          "LAST_ID" },
104         { ACCT_USER_OID,                "acct_usr_inode" },
105         { ACCT_GROUP_OID,               "acct_grp_inode" },
106         { MDD_ROOT_INDEX_OID,           NULL },
107         { MDD_ORPHAN_OID,               NULL },
108         { 0,                            NULL }
109 };
110
111 static char *oid2name(const unsigned long oid)
112 {
113         int i = 0;
114
115         while (oids[i].oid) {
116                 if (oids[i].oid == oid)
117                         return oids[i].name;
118                 i++;
119         }
120         return NULL;
121 }
122
123 /*
124  * objects w/o a natural reference (unlike a file on a MDS)
125  * are put under a special hierarchy /O/<seq>/d0..dXX
126  * this function returns a directory specific fid belongs to
127  */
128 static uint64_t
129 osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
130                         const struct lu_fid *fid, char *buf)
131 {
132         unsigned long   b;
133         int             rc;
134
135         rc = fid_ostid_pack(fid, &osd_oti_get(env)->oti_ostid);
136         LASSERT(rc == 0); /* we should not get here with IGIF */
137         b = osd_oti_get(env)->oti_ostid.oi_id % OSD_OST_MAP_SIZE;
138         LASSERT(osd->od_ost_compat_dirs[b]);
139
140         sprintf(buf, LPU64, osd_oti_get(env)->oti_ostid.oi_id);
141
142         return osd->od_ost_compat_dirs[b];
143 }
144
145 /* XXX: f_ver is not counted, but may differ too */
146 static void osd_fid2str(char *buf, const struct lu_fid *fid)
147 {
148         sprintf(buf, DFID_NOBRACE, PFID(fid));
149 }
150
151 /*
152  * Determine the zap object id which is being used as the OI for the
153  * given fid.  The lowest N bits in the sequence ID are used as the
154  * index key.  On failure 0 is returned which zfs treats internally
155  * as an invalid object id.
156  */
157 static uint64_t
158 osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
159                     char *buf)
160 {
161         struct osd_oi *oi;
162
163         LASSERT(osd->od_oi_table != NULL);
164         oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
165         osd_fid2str(buf, fid);
166
167         return oi->oi_zapid;
168 }
169
170 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
171                             const struct lu_fid *fid, char *buf)
172 {
173         uint64_t zapid;
174
175         LASSERT(fid);
176         LASSERT(buf);
177
178         if (fid_is_idif(fid)) {
179                 zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf);
180         } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
181                 /* special objects with fixed known fids get their name */
182                 char *name = oid2name(fid_oid(fid));
183
184                 if (name) {
185                         zapid = osd->od_root;
186                         strcpy(buf, name);
187                         if (fid_oid(fid) == OFD_GROUP0_LAST_OID)
188                                 zapid = osd->od_ost_compat_grp0;
189                         else if (fid_is_acct(fid))
190                                 zapid = MASTER_NODE_OBJ;
191                 } else {
192                         zapid = osd_get_idx_for_fid(osd, fid, buf);
193                 }
194         } else {
195                 zapid = osd_get_idx_for_fid(osd, fid, buf);
196         }
197
198         return zapid;
199 }
200
201 static inline int fid_is_fs_root(const struct lu_fid *fid)
202 {
203         /* Map root inode to special local object FID */
204         return fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
205                 fid_oid(fid) == OSD_FS_ROOT_OID;
206 }
207
208 int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
209                    const struct lu_fid *fid, uint64_t *oid)
210 {
211         struct osd_thread_info  *info = osd_oti_get(env);
212         char                    *buf = info->oti_buf;
213         uint64_t                zapid;
214         int                     rc = 0;
215         ENTRY;
216
217         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
218                 RETURN(-ENOENT);
219
220         if (unlikely(fid_is_acct(fid))) {
221                 if (fid_oid(fid) == ACCT_USER_OID)
222                         *oid = dev->od_iusr_oid;
223                 else
224                         *oid = dev->od_igrp_oid;
225         } else if (unlikely(fid_is_fs_root(fid))) {
226                 *oid = dev->od_root;
227         } else {
228                 zapid = osd_get_name_n_idx(env, dev, fid, buf);
229
230                 rc = -zap_lookup(dev->od_objset.os, zapid, buf,
231                                 8, 1, &info->oti_zde);
232                 if (rc)
233                         RETURN(rc);
234                 *oid = info->oti_zde.lzd_reg.zde_dnode;
235         }
236
237         RETURN(rc);
238 }
239
240 /**
241  * Lookup an existing OI by the given name.
242  */
243 static int
244 osd_oi_lookup(const struct lu_env *env, struct osd_device *o,
245               uint64_t parent, const char *name, struct osd_oi *oi)
246 {
247         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
248         int                      rc;
249
250         rc = -zap_lookup(o->od_objset.os, parent, name, 8, 1, (void *)zde);
251         if (rc)
252                 return rc;
253
254         strncpy(oi->oi_name, name, OSD_OI_NAME_SIZE - 1);
255         oi->oi_zapid = zde->zde_dnode;
256
257         return rc;
258 }
259
260 /**
261  * Create a new OI with the given name.
262  */
263 static int
264 osd_oi_create(const struct lu_env *env, struct osd_device *o,
265               uint64_t parent, const char *name, uint64_t *child)
266 {
267         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
268         struct lu_attr          *la = &osd_oti_get(env)->oti_la;
269         dmu_buf_t               *db;
270         dmu_tx_t                *tx;
271         int                      rc;
272
273         /* verify it doesn't already exist */
274         rc = -zap_lookup(o->od_objset.os, parent, name, 8, 1, (void *)zde);
275         if (rc == 0)
276                 return -EEXIST;
277
278         /* create fid-to-dnode index */
279         tx = dmu_tx_create(o->od_objset.os);
280         if (tx == NULL)
281                 return -ENOMEM;
282
283         dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL);
284         dmu_tx_hold_bonus(tx, parent);
285         dmu_tx_hold_zap(tx, parent, TRUE, name);
286         LASSERT(tx->tx_objset->os_sa);
287         dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
288
289         rc = -dmu_tx_assign(tx, TXG_WAIT);
290         if (rc) {
291                 dmu_tx_abort(tx);
292                 return rc;
293         }
294
295         la->la_valid = LA_MODE | LA_UID | LA_GID;
296         la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
297         la->la_uid = la->la_gid = 0;
298         __osd_zap_create(env, &o->od_objset, &db, tx, la, oi_tag, 0);
299
300         zde->zde_dnode = db->db_object;
301         zde->zde_pad = 0;
302         zde->zde_type = IFTODT(S_IFDIR);
303
304         rc = -zap_add(o->od_objset.os, parent, name, 8, 1, (void *)zde, tx);
305
306         dmu_tx_commit(tx);
307
308         *child = db->db_object;
309         sa_buf_rele(db, oi_tag);
310
311         return rc;
312 }
313
314 static int
315 osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o,
316                       uint64_t parent, const char *name, uint64_t *child)
317 {
318         struct osd_oi   oi;
319         int             rc;
320
321         rc = osd_oi_lookup(env, o, parent, name, &oi);
322         if (rc == 0) {
323                 *child = oi.oi_zapid;
324         } else if (rc == -ENOENT) {
325                 rc = osd_oi_create(env, o, parent, name, child);
326         }
327
328         return rc;
329 }
330
331 /**
332  * Close an entry in a specific slot.
333  */
334 static void
335 osd_oi_remove_table(const struct lu_env *env, struct osd_device *o, int key)
336 {
337         struct osd_oi *oi;
338
339         LASSERT(key < o->od_oi_count);
340
341         oi = o->od_oi_table[key];
342         if (oi) {
343                 OBD_FREE_PTR(oi);
344                 o->od_oi_table[key] = NULL;
345         }
346 }
347
348 /**
349  * Allocate and open a new entry in the specified unused slot.
350  */
351 static int
352 osd_oi_add_table(const struct lu_env *env, struct osd_device *o,
353                  char *name, int key)
354 {
355         struct osd_oi *oi;
356         int rc;
357
358         LASSERT(key < o->od_oi_count);
359         LASSERT(o->od_oi_table[key] == NULL);
360
361         OBD_ALLOC_PTR(oi);
362         if (oi == NULL)
363                 return -ENOMEM;
364
365         rc = osd_oi_lookup(env, o, o->od_root, name, oi);
366         if (rc) {
367                 OBD_FREE_PTR(oi);
368                 return rc;
369         }
370
371         o->od_oi_table[key] = oi;
372
373         return 0;
374 }
375
376 /**
377  * Depopulate the OI table.
378  */
379 static void
380 osd_oi_close_table(const struct lu_env *env, struct osd_device *o)
381 {
382         int i;
383
384         for (i = 0; i < o->od_oi_count; i++)
385                 osd_oi_remove_table(env, o, i);
386 }
387
388 /**
389  * Populate the OI table based.
390  */
391 static int
392 osd_oi_open_table(const struct lu_env *env, struct osd_device *o, int count)
393 {
394         char name[16];
395         int  i, rc = 0;
396         ENTRY;
397
398         for (i = 0; i < count; i++) {
399                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
400                 rc = osd_oi_add_table(env, o, name, i);
401                 if (rc) {
402                         osd_oi_close_table(env, o);
403                         break;
404                 }
405         }
406
407         RETURN(rc);
408 }
409
410 /**
411  * Determine if the type and number of OIs used by this file system.
412  */
413 static int
414 osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count)
415 {
416         uint64_t        root_oid = o->od_root;
417         struct osd_oi   oi;
418         char            name[16];
419         int             rc;
420         ENTRY;
421
422         /*
423          * Check for multiple OIs and determine the count.  There is no
424          * gap handling, if an OI is missing the wrong size can be returned.
425          * The only safeguard is that we know the number of OIs must be a
426          * power of two and this is checked for basic sanity.
427          */
428         for (*count = 0; *count < OSD_OI_FID_NR_MAX; (*count)++) {
429                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, *count);
430                 rc = osd_oi_lookup(env, o, root_oid, name, &oi);
431                 if (rc == 0)
432                         continue;
433
434                 if (rc == -ENOENT) {
435                         if (*count == 0)
436                                 break;
437
438                         if ((*count & (*count - 1)) != 0)
439                                 RETURN(-EDOM);
440
441                         RETURN(0);
442                 }
443
444                 RETURN(rc);
445         }
446
447         /*
448          * No OIs exist, this must be a new filesystem.
449          */
450         *count = 0;
451
452         RETURN(0);
453 }
454
455 /**
456  * Create /O subdirectory to map legacy OST objects for compatibility.
457  */
458 static int
459 osd_oi_init_compat(const struct lu_env *env, struct osd_device *o)
460 {
461         char            *key = osd_oti_get(env)->oti_buf;
462         uint64_t         odb, sdb;
463         int              i, rc;
464         ENTRY;
465
466         rc = osd_oi_find_or_create(env, o, o->od_root, "O", &sdb);
467         if (rc)
468                 RETURN(rc);
469
470         /* create /O/0 subdirectory to map legacy OST objects */
471         rc = osd_oi_find_or_create(env, o, sdb, "0", &odb);
472         if (rc)
473                 RETURN(rc);
474
475         o->od_ost_compat_grp0 = odb;
476
477         for (i = 0; i < OSD_OST_MAP_SIZE; i++) {
478                 sprintf(key, "d%d", i);
479                 rc = osd_oi_find_or_create(env, o, odb, key, &sdb);
480                 if (rc)
481                         RETURN(rc);
482
483                 o->od_ost_compat_dirs[i] = sdb;
484         }
485
486         /* Create on-disk indexes to maintain per-UID/GID inode usage.
487          * Those new indexes are created in the top-level ZAP outside the
488          * namespace in order not to confuse ZPL which might interpret those
489          * indexes as directories and assume the values are object IDs */
490         rc = osd_oi_find_or_create(env, o, MASTER_NODE_OBJ,
491                         oid2name(ACCT_USER_OID), &odb);
492         if (rc)
493                 RETURN(rc);
494         o->od_iusr_oid = odb;
495
496         rc = osd_oi_find_or_create(env, o, MASTER_NODE_OBJ,
497                         oid2name(ACCT_GROUP_OID), &odb);
498         if (rc)
499                 RETURN(rc);
500         o->od_igrp_oid = odb;
501
502         RETURN(rc);
503 }
504
505 /**
506  * Initialize the OIs by either opening or creating them as needed.
507  */
508 int osd_oi_init(const struct lu_env *env, struct osd_device *o)
509 {
510         char    *key = osd_oti_get(env)->oti_buf;
511         int      i, rc, count = 0;
512         ENTRY;
513
514         rc = osd_oi_probe(env, o, &count);
515         if (rc)
516                 RETURN(rc);
517
518         if (count == 0) {
519                 uint64_t odb, sdb;
520
521                 count = osd_oi_count;
522                 odb = o->od_root;
523
524                 for (i = 0; i < count; i++) {
525                         sprintf(key, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
526                         rc = osd_oi_find_or_create(env, o, odb, key, &sdb);
527                         if (rc)
528                                 RETURN(rc);
529                 }
530         }
531
532         rc = osd_oi_init_compat(env, o);
533         if (rc)
534                 RETURN(rc);
535
536         LASSERT((count & (count - 1)) == 0);
537         o->od_oi_count = count;
538         OBD_ALLOC(o->od_oi_table, sizeof(struct osd_oi *) * count);
539         if (o->od_oi_table == NULL)
540                 RETURN(-ENOMEM);
541
542         rc = osd_oi_open_table(env, o, count);
543         if (rc) {
544                 OBD_FREE(o->od_oi_table, sizeof(struct osd_oi *) * count);
545                 o->od_oi_table = NULL;
546         }
547
548         RETURN(rc);
549 }
550
551 void osd_oi_fini(const struct lu_env *env, struct osd_device *o)
552 {
553         ENTRY;
554
555         if (o->od_oi_table != NULL) {
556                 (void) osd_oi_close_table(env, o);
557                 OBD_FREE(o->od_oi_table,
558                          sizeof(struct osd_oi *) * o->od_oi_count);
559                 o->od_oi_table = NULL;
560                 o->od_oi_count = 0;
561         }
562
563         EXIT;
564 }
565
566 int osd_options_init(void)
567 {
568         /* osd_oi_count - Default number of OIs, 128 works well for ZFS */
569         if (osd_oi_count == 0 || osd_oi_count > OSD_OI_FID_NR_MAX)
570                 osd_oi_count = OSD_OI_FID_NR;
571
572         if ((osd_oi_count & (osd_oi_count - 1)) != 0) {
573                 LCONSOLE_WARN("Round up osd_oi_count %d to power2 %d\n",
574                         osd_oi_count, size_roundup_power2(osd_oi_count));
575                 osd_oi_count = size_roundup_power2(osd_oi_count);
576         }
577
578         return 0;
579 }
580
581