Whamcloud - gitweb
LU-2449 osd: osd-zfs to handle ./.. properly
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2012, Intel Corporation.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_index.c
39  *
40  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41  * Author: Mike Pershin <tappro@whamcloud.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_OSD
48
49 #include <lustre_ver.h>
50 #include <libcfs/libcfs.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_support.h>
53 #include <lustre_net.h>
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_disk.h>
57 #include <lustre_fid.h>
58
59 #include "osd_internal.h"
60
61 #include <sys/dnode.h>
62 #include <sys/dbuf.h>
63 #include <sys/spa.h>
64 #include <sys/stat.h>
65 #include <sys/zap.h>
66 #include <sys/spa_impl.h>
67 #include <sys/zfs_znode.h>
68 #include <sys/dmu_tx.h>
69 #include <sys/dmu_objset.h>
70 #include <sys/dsl_prop.h>
71 #include <sys/sa_impl.h>
72 #include <sys/txg.h>
73
74 static struct dt_it *osd_index_it_init(const struct lu_env *env,
75                                        struct dt_object *dt,
76                                        __u32 unused,
77                                        struct lustre_capa *capa)
78 {
79         struct osd_thread_info  *info = osd_oti_get(env);
80         struct osd_zap_it       *it;
81         struct osd_object       *obj = osd_dt_obj(dt);
82         struct osd_device       *osd = osd_obj2dev(obj);
83         struct lu_object        *lo  = &dt->do_lu;
84         ENTRY;
85
86         /* XXX: check capa ? */
87
88         LASSERT(lu_object_exists(lo));
89         LASSERT(obj->oo_db);
90         LASSERT(udmu_object_is_zap(obj->oo_db));
91         LASSERT(info);
92
93         it = &info->oti_it_zap;
94
95         if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
96                                  obj->oo_db->db_object, 0))
97                 RETURN(ERR_PTR(-ENOMEM));
98
99         it->ozi_obj   = obj;
100         it->ozi_capa  = capa;
101         it->ozi_reset = 1;
102         lu_object_get(lo);
103
104         RETURN((struct dt_it *)it);
105 }
106
107 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
108 {
109         struct osd_zap_it *it = (struct osd_zap_it *)di;
110         struct osd_object *obj;
111         ENTRY;
112
113         LASSERT(it);
114         LASSERT(it->ozi_obj);
115
116         obj = it->ozi_obj;
117
118         udmu_zap_cursor_fini(it->ozi_zc);
119         lu_object_put(env, &obj->oo_dt.do_lu);
120
121         EXIT;
122 }
123
124
125 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
126 {
127         /* PBS: do nothing : ref are incremented at retrive and decreamented
128          *      next/finish. */
129 }
130
131 int udmu_zap_cursor_retrieve_key(const struct lu_env *env,
132                                  zap_cursor_t *zc, char *key, int max)
133 {
134         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
135         int              err;
136
137         if ((err = zap_cursor_retrieve(zc, za)))
138                 return err;
139
140         if (key)
141                 strcpy(key, za->za_name);
142
143         return 0;
144 }
145
146 /*
147  * zap_cursor_retrieve read from current record.
148  * to read bytes we need to call zap_lookup explicitly.
149  */
150 int udmu_zap_cursor_retrieve_value(const struct lu_env *env,
151                 zap_cursor_t *zc,  char *buf,
152                 int buf_size, int *bytes_read)
153 {
154         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
155         int err, actual_size;
156
157         if ((err = zap_cursor_retrieve(zc, za)))
158                 return err;
159
160         if (za->za_integer_length <= 0)
161                 return (ERANGE);
162
163         actual_size = za->za_integer_length * za->za_num_integers;
164
165         if (actual_size > buf_size) {
166                 actual_size = buf_size;
167                 buf_size = actual_size / za->za_integer_length;
168         } else {
169                 buf_size = za->za_num_integers;
170         }
171
172         err = -zap_lookup(zc->zc_objset, zc->zc_zapobj,
173                         za->za_name, za->za_integer_length,
174                         buf_size, buf);
175
176         if (!err)
177                 *bytes_read = actual_size;
178
179         return err;
180 }
181
182 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
183                                        int len, __u16 type)
184 {
185         const unsigned    align = sizeof(struct luda_type) - 1;
186         struct luda_type *lt;
187
188         /* check if file type is required */
189         if (attr & LUDA_TYPE) {
190                 len = (len + align) & ~align;
191
192                 lt = (void *)ent->lde_name + len;
193                 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
194                 ent->lde_attrs |= LUDA_TYPE;
195         }
196
197         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
198 }
199
200 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
201                                struct lu_fid *fid)
202 {
203         struct link_ea_header  *leh;
204         struct link_ea_entry   *lee;
205         struct lu_buf           buf;
206         int                     rc;
207         ENTRY;
208
209         buf.lb_buf = osd_oti_get(env)->oti_buf;
210         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
211
212         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
213         if (rc == -ERANGE) {
214                 rc = osd_xattr_get(env, o, &LU_BUF_NULL,
215                                    XATTR_NAME_LINK, BYPASS_CAPA);
216                 if (rc < 0)
217                         RETURN(rc);
218                 LASSERT(rc > 0);
219                 OBD_ALLOC(buf.lb_buf, rc);
220                 if (buf.lb_buf == NULL)
221                         RETURN(-ENOMEM);
222                 buf.lb_len = rc;
223                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK, BYPASS_CAPA);
224         }
225         if (rc < 0)
226                 GOTO(out, rc);
227         if (rc < sizeof(*leh) + sizeof(*lee))
228                 GOTO(out, rc = -EINVAL);
229
230         leh = buf.lb_buf;
231         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
232                 leh->leh_magic = LINK_EA_MAGIC;
233                 leh->leh_reccount = __swab32(leh->leh_reccount);
234                 leh->leh_len = __swab64(leh->leh_len);
235         }
236         if (leh->leh_magic != LINK_EA_MAGIC)
237                 GOTO(out, rc = -EINVAL);
238         if (leh->leh_reccount == 0)
239                 GOTO(out, rc = -ENODATA);
240
241         lee = (struct link_ea_entry *)(leh + 1);
242         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
243         rc = 0;
244
245 out:
246         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
247                 OBD_FREE(buf.lb_buf, buf.lb_len);
248         RETURN(rc);
249 }
250
251 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
252                           struct dt_rec *rec, const struct dt_key *key,
253                           struct lustre_capa *capa)
254 {
255         struct osd_thread_info *oti = osd_oti_get(env);
256         struct osd_object  *obj = osd_dt_obj(dt);
257         struct osd_device  *osd = osd_obj2dev(obj);
258         char               *name = (char *)key;
259         int                 rc;
260         ENTRY;
261
262         LASSERT(udmu_object_is_zap(obj->oo_db));
263
264         if (name[0] == '.') {
265                 if (name[1] == 0) {
266                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
267                         memcpy(rec, f, sizeof(*f));
268                         RETURN(1);
269                 } else if (name[1] == '.' && name[2] == 0) {
270                         rc = osd_find_parent_fid(env, dt, (struct lu_fid *)rec);
271                         RETURN(rc == 0 ? 1 : rc);
272                 }
273         }
274
275         rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
276                          (char *)key, 8, sizeof(oti->oti_zde) / 8,
277                          (void *)&oti->oti_zde);
278         memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
279
280         RETURN(rc == 0 ? 1 : rc);
281 }
282
283 static int osd_declare_dir_insert(const struct lu_env *env,
284                                   struct dt_object *dt,
285                                   const struct dt_rec *rec,
286                                   const struct dt_key *key,
287                                   struct thandle *th)
288 {
289         struct osd_object  *obj = osd_dt_obj(dt);
290         struct osd_thandle *oh;
291         ENTRY;
292
293         LASSERT(th != NULL);
294         oh = container_of0(th, struct osd_thandle, ot_super);
295
296         LASSERT(obj->oo_db);
297         LASSERT(udmu_object_is_zap(obj->oo_db));
298
299         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
300         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
301
302         RETURN(0);
303 }
304
305 /**
306  * Find the osd object for given fid.
307  *
308  * \param fid need to find the osd object having this fid
309  *
310  * \retval osd_object on success
311  * \retval        -ve on error
312  */
313 struct osd_object *osd_object_find(const struct lu_env *env,
314                                    struct dt_object *dt,
315                                    const struct lu_fid *fid)
316 {
317         struct lu_device         *ludev = dt->do_lu.lo_dev;
318         struct osd_object        *child = NULL;
319         struct lu_object         *luch;
320         struct lu_object         *lo;
321
322         /*
323          * at this point topdev might not exist yet
324          * (i.e. MGS is preparing profiles). so we can
325          * not rely on topdev and instead lookup with
326          * our device passed as topdev. this can't work
327          * if the object isn't cached yet (as osd doesn't
328          * allocate lu_header). IOW, the object must be
329          * in the cache, otherwise lu_object_alloc() crashes
330          * -bzzz
331          */
332         luch = lu_object_find_at(env, ludev, fid, NULL);
333         if (IS_ERR(luch))
334                 return (void *)luch;
335
336         if (lu_object_exists(luch)) {
337                 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
338                 if (lo != NULL)
339                         child = osd_obj(lo);
340                 else
341                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
342                                         "%s: object can't be located "DFID"\n",
343                                         osd_dev(ludev)->od_svname, PFID(fid));
344
345                 if (child == NULL) {
346                         lu_object_put(env, luch);
347                         CERROR("%s: Unable to get osd_object "DFID"\n",
348                                osd_dev(ludev)->od_svname, PFID(fid));
349                         child = ERR_PTR(-ENOENT);
350                 }
351         } else {
352                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
353                                 "%s: lu_object does not exists "DFID"\n",
354                                 osd_dev(ludev)->od_svname, PFID(fid));
355                 lu_object_put(env, luch);
356                 child = ERR_PTR(-ENOENT);
357         }
358
359         return child;
360 }
361
362 /**
363  * Put the osd object once done with it.
364  *
365  * \param obj osd object that needs to be put
366  */
367 static inline void osd_object_put(const struct lu_env *env,
368                                   struct osd_object *obj)
369 {
370         lu_object_put(env, &obj->oo_dt.do_lu);
371 }
372
373 /**
374  *      Inserts (key, value) pair in \a directory object.
375  *
376  *      \param  dt      osd index object
377  *      \param  key     key for index
378  *      \param  rec     record reference
379  *      \param  th      transaction handler
380  *      \param  capa    capability descriptor
381  *      \param  ignore_quota update should not affect quota
382  *
383  *      \retval  0  success
384  *      \retval -ve failure
385  */
386 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
387                           const struct dt_rec *rec, const struct dt_key *key,
388                           struct thandle *th, struct lustre_capa *capa,
389                           int ignore_quota)
390 {
391         struct osd_thread_info *oti = osd_oti_get(env);
392         struct osd_object   *parent = osd_dt_obj(dt);
393         struct osd_device   *osd = osd_obj2dev(parent);
394         struct lu_fid       *fid = (struct lu_fid *)rec;
395         struct osd_thandle  *oh;
396         struct osd_object   *child;
397         __u32                attr;
398         char                *name = (char *)key;
399         int                  rc;
400         ENTRY;
401
402         LASSERT(parent->oo_db);
403         LASSERT(udmu_object_is_zap(parent->oo_db));
404
405         LASSERT(dt_object_exists(dt));
406         LASSERT(osd_invariant(parent));
407
408         LASSERT(th != NULL);
409         oh = container_of0(th, struct osd_thandle, ot_super);
410
411         child = osd_object_find(env, dt, fid);
412         if (IS_ERR(child))
413                 RETURN(PTR_ERR(child));
414
415 /*
416  * to simulate old Orion setups with ./..  stored in the directories
417  */
418 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0)
419 #define OSD_ZFS_INSERT_DOTS_FOR_TESTING__
420 #endif
421
422         LASSERT(child->oo_db);
423         if (name[0] == '.') {
424                 if (name[1] == 0) {
425                         /* do not store ".", instead generate it
426                          * during iteration */
427 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
428                         GOTO(out, rc = 0);
429 #endif
430                 } else if (name[1] == '.' && name[2] == 0) {
431                         /* update parent dnode in the child.
432                          * later it will be used to generate ".." */
433                         udmu_objset_t *uos = &osd->od_objset;
434                         rc = osd_object_sa_update(child,
435                                                   SA_ZPL_PARENT(uos),
436                                                   &parent->oo_db->db_object,
437                                                   8, oh);
438
439 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
440                         GOTO(out, rc);
441 #endif
442                 }
443         }
444
445         CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
446         CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
447         attr = child->oo_dt.do_lu.lo_header ->loh_attr;
448         oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
449         oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
450         oti->oti_zde.lzd_fid = *fid;
451
452         /* Insert (key,oid) into ZAP */
453         rc = -zap_add(osd->od_objset.os, parent->oo_db->db_object,
454                       (char *)key, 8, sizeof(oti->oti_zde) / 8,
455                       (void *)&oti->oti_zde, oh->ot_tx);
456
457 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
458 out:
459 #endif
460         osd_object_put(env, child);
461
462         RETURN(rc);
463 }
464
465 static int osd_declare_dir_delete(const struct lu_env *env,
466                                   struct dt_object *dt,
467                                   const struct dt_key *key,
468                                   struct thandle *th)
469 {
470         struct osd_object *obj = osd_dt_obj(dt);
471         struct osd_thandle *oh;
472         ENTRY;
473
474         LASSERT(dt_object_exists(dt));
475         LASSERT(osd_invariant(obj));
476
477         LASSERT(th != NULL);
478         oh = container_of0(th, struct osd_thandle, ot_super);
479
480         LASSERT(obj->oo_db);
481         LASSERT(udmu_object_is_zap(obj->oo_db));
482
483         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, (char *)key);
484
485         RETURN(0);
486 }
487
488 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
489                           const struct dt_key *key, struct thandle *th,
490                           struct lustre_capa *capa)
491 {
492         struct osd_object *obj = osd_dt_obj(dt);
493         struct osd_device *osd = osd_obj2dev(obj);
494         struct osd_thandle *oh;
495         dmu_buf_t *zap_db = obj->oo_db;
496         char      *name = (char *)key;
497         int rc;
498         ENTRY;
499
500         LASSERT(obj->oo_db);
501         LASSERT(udmu_object_is_zap(obj->oo_db));
502
503         LASSERT(th != NULL);
504         oh = container_of0(th, struct osd_thandle, ot_super);
505
506 #ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
507         /*
508          * in Orion . and .. were stored in the directory (not generated up on
509          * request as now. we preserve them for backward compatibility
510          */
511         if (name[0] == '.') {
512                 if (name[1] == 0) {
513                         RETURN(0);
514                 } else if (name[1] == '.' && name[2] == 0) {
515                         RETURN(0);
516                 }
517         }
518 #endif
519
520         /* Remove key from the ZAP */
521         rc = -zap_remove(osd->od_objset.os, zap_db->db_object,
522                          (char *) key, oh->ot_tx);
523
524 #if LUSTRE_VERSION_CODE <= OBD_OCD_VERSION(2, 4, 53, 0)
525         if (unlikely(rc == -ENOENT && name[0] == '.' &&
526             (name[1] == 0 || (name[1] == '.' && name[2] == 0))))
527                 rc = 0;
528 #endif
529         if (unlikely(rc && rc != -ENOENT))
530                 CERROR("%s: zap_remove failed: rc = %d\n", osd->od_svname, rc);
531
532         RETURN(rc);
533 }
534
535 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
536                                      struct dt_object *dt,
537                                      __u32 unused,
538                                      struct lustre_capa *capa)
539 {
540         struct osd_zap_it *it;
541
542         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused, capa);
543         if (!IS_ERR(it))
544                 it->ozi_pos = 0;
545
546         RETURN((struct dt_it *)it);
547 }
548
549 /**
550  *  Move Iterator to record specified by \a key
551  *
552  *  \param  di      osd iterator
553  *  \param  key     key for index
554  *
555  *  \retval +ve  di points to record with least key not larger than key
556  *  \retval  0   di points to exact matched key
557  *  \retval -ve  failure
558  */
559 static int osd_dir_it_get(const struct lu_env *env,
560                           struct dt_it *di, const struct dt_key *key)
561 {
562         struct osd_zap_it *it = (struct osd_zap_it *)di;
563         struct osd_object *obj = it->ozi_obj;
564         struct osd_device *osd = osd_obj2dev(obj);
565         char              *name = (char *)key;
566         int                rc;
567         ENTRY;
568
569         LASSERT(it);
570         LASSERT(it->ozi_zc);
571
572         udmu_zap_cursor_fini(it->ozi_zc);
573
574         if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
575                                  obj->oo_db->db_object, 0))
576                 RETURN(-ENOMEM);
577
578         /* XXX: implementation of the API is broken at the moment */
579         LASSERT(((const char *)key)[0] == 0);
580
581         if (name[0] == 0) {
582                 it->ozi_pos = 0;
583                 RETURN(1);
584         }
585
586         if (name[0] == '.') {
587                 if (name[1] == 0) {
588                         it->ozi_pos = 1;
589                         GOTO(out, rc = 1);
590                 } else if (name[1] == '.' && name[2] == 0) {
591                         it->ozi_pos = 2;
592                         GOTO(out, rc = 1);
593                 }
594         }
595
596         /* neither . nor .. - some real record */
597         it->ozi_pos = 3;
598         rc = +1;
599
600 out:
601         RETURN(rc);
602 }
603
604 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
605 {
606         /* PBS: do nothing : ref are incremented at retrive and decreamented
607          *      next/finish. */
608 }
609
610 /*
611  * in Orion . and .. were stored in the directory, while ZPL
612  * and current osd-zfs generate them up on request. so, we
613  * need to ignore previously stored . and ..
614  */
615 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
616                                         zap_attribute_t *za)
617 {
618         int rc, isdot;
619
620         do {
621                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
622
623                 isdot = 0;
624                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
625                         if (za->za_name[1] == 0) {
626                                 isdot = 1;
627                         } else if (za->za_name[1] == '.' &&
628                                    za->za_name[2] == 0) {
629                                 isdot = 1;
630                         }
631                         if (unlikely(isdot))
632                                 zap_cursor_advance(it->ozi_zc);
633                 }
634         } while (unlikely(rc == 0 && isdot));
635
636         return rc;
637 }
638
639 /**
640  * to load a directory entry at a time and stored it in
641  * iterator's in-memory data structure.
642  *
643  * \param di, struct osd_it_ea, iterator's in memory structure
644  *
645  * \retval +ve, iterator reached to end
646  * \retval   0, iterator not reached to end
647  * \retval -ve, on error
648  */
649 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
650 {
651         struct osd_zap_it *it = (struct osd_zap_it *)di;
652         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
653         int                rc;
654
655         /* temp. storage should be enough for any key supported by ZFS */
656         CLASSERT(sizeof(za->za_name) <= sizeof(it->ozi_name));
657
658         /*
659          * the first ->next() moves the cursor to .
660          * the second ->next() moves the cursor to ..
661          * then we get to the real records and have to verify any exist
662          */
663         if (it->ozi_pos <= 2) {
664                 it->ozi_pos++;
665                 if (it->ozi_pos <=2)
666                         RETURN(0);
667         }
668
669         zap_cursor_advance(it->ozi_zc);
670
671         /*
672          * According to current API we need to return error if its last entry.
673          * zap_cursor_advance() does not return any value. So we need to call
674          * retrieve to check if there is any record.  We should make
675          * changes to Iterator API to not return status for this API
676          */
677         rc = osd_index_retrieve_skip_dots(it, za);
678
679         if (rc == -ENOENT) /* end of dir */
680                 RETURN(+1);
681
682         RETURN(rc);
683 }
684
685 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
686                                      const struct dt_it *di)
687 {
688         struct osd_zap_it *it = (struct osd_zap_it *)di;
689         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
690         int                rc = 0;
691         ENTRY;
692
693         if (it->ozi_pos <= 1) {
694                 it->ozi_pos = 1;
695                 RETURN((struct dt_key *)".");
696         } else if (it->ozi_pos == 2) {
697                 RETURN((struct dt_key *)"..");
698         }
699
700         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
701                 RETURN(ERR_PTR(rc));
702
703         strcpy(it->ozi_name, za->za_name);
704
705 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0)
706         if (za->za_name[0] == '.') {
707                 if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
708                     za->za_name[2] == 0)) {
709                         /* we should not get onto . and ..
710                          * stored in the directory. ->next() and
711                          * other methods should prevent this
712                          */
713                         LBUG();
714                 }
715         }
716 #endif
717
718         RETURN((struct dt_key *)it->ozi_name);
719 }
720
721 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
722 {
723         struct osd_zap_it *it = (struct osd_zap_it *)di;
724         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
725         int                rc;
726         ENTRY;
727
728         if (it->ozi_pos <= 1) {
729                 it->ozi_pos = 1;
730                 RETURN(2);
731         } else if (it->ozi_pos == 2) {
732                 RETURN(3);
733         }
734
735         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
736                 rc = strlen(za->za_name);
737
738 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 99, 0)
739         if (rc == 0 && za->za_name[0] == '.') {
740                 if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
741                     za->za_name[2] == 0)) {
742                         /* we should not get onto . and ..
743                          * stored in the directory. ->next() and
744                          * other methods should prevent this
745                          */
746                         LBUG();
747                 }
748         }
749 #endif
750         RETURN(rc);
751 }
752
753 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
754                           struct dt_rec *dtrec, __u32 attr)
755 {
756         struct osd_zap_it   *it = (struct osd_zap_it *)di;
757         struct lu_dirent    *lde = (struct lu_dirent *)dtrec;
758         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
759         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
760         int                  rc, namelen;
761         ENTRY;
762
763         if (it->ozi_pos <= 1) {
764                 lde->lde_hash = cpu_to_le64(1);
765                 strcpy(lde->lde_name, ".");
766                 lde->lde_namelen = cpu_to_le16(1);
767                 lde->lde_fid = *lu_object_fid(&it->ozi_obj->oo_dt.do_lu);
768                 lde->lde_attrs = LUDA_FID;
769                 /* append lustre attributes */
770                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
771                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
772                 it->ozi_pos = 1;
773                 GOTO(out, rc = 0);
774
775         } else if (it->ozi_pos == 2) {
776                 lde->lde_hash = cpu_to_le64(2);
777                 strcpy(lde->lde_name, "..");
778                 lde->lde_namelen = cpu_to_le16(2);
779                 lde->lde_attrs = LUDA_FID;
780                 /* append lustre attributes */
781                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
782                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
783                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, &lde->lde_fid);
784                 /*
785                  * early Orion code was not setting LinkEA, so it's possible
786                  * some setups still have objects with no LinkEA set.
787                  * but at that time .. was a real record in the directory
788                  * so we should try to lookup .. in ZAP
789                  */
790                 if (rc != -ENOENT)
791                         GOTO(out, rc);
792         }
793
794         LASSERT(lde);
795
796         lde->lde_hash = cpu_to_le64(udmu_zap_cursor_serialize(it->ozi_zc));
797
798         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
799                 GOTO(out, rc);
800
801         namelen = strlen(za->za_name);
802         if (namelen > NAME_MAX)
803                 GOTO(out, rc = -EOVERFLOW);
804         strcpy(lde->lde_name, za->za_name);
805         lde->lde_namelen = cpu_to_le16(namelen);
806
807         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
808                 CERROR("%s: unsupported direntry format: %d %d\n",
809                        osd_obj2dev(it->ozi_obj)->od_svname,
810                        za->za_integer_length, (int)za->za_num_integers);
811
812                 GOTO(out, rc = -EIO);
813         }
814
815         rc = -zap_lookup(it->ozi_zc->zc_objset, it->ozi_zc->zc_zapobj,
816                          za->za_name, za->za_integer_length, 3, zde);
817         if (rc)
818                 GOTO(out, rc);
819
820         lde->lde_fid = zde->lzd_fid;
821         lde->lde_attrs = LUDA_FID;
822
823         /* append lustre attributes */
824         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
825
826         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
827
828 out:
829         RETURN(rc);
830 }
831
832 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
833 {
834         struct osd_zap_it *it = (struct osd_zap_it *)di;
835         __u64              pos;
836         ENTRY;
837
838         if (it->ozi_pos <= 2)
839                 pos = it->ozi_pos;
840         else
841                 pos = udmu_zap_cursor_serialize(it->ozi_zc);
842
843         RETURN(pos);
844 }
845
846 /*
847  * return status :
848  *  rc == 0 -> end of directory.
849  *  rc >  0 -> ok, proceed.
850  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
851  */
852 static int osd_dir_it_load(const struct lu_env *env,
853                         const struct dt_it *di, __u64 hash)
854 {
855         struct osd_zap_it *it = (struct osd_zap_it *)di;
856         struct osd_object *obj = it->ozi_obj;
857         struct osd_device *osd = osd_obj2dev(obj);
858         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
859         int                rc;
860         ENTRY;
861
862         if (it->ozi_pos != 0) {
863                 /* the cursor wasn't at the beginning
864                  * so we should reset ZAP cursor as well */
865                 udmu_zap_cursor_fini(it->ozi_zc);
866                 if (udmu_zap_cursor_init(&it->ozi_zc, &osd->od_objset,
867                                          obj->oo_db->db_object, hash))
868                         RETURN(-ENOMEM);
869         }
870
871         if (hash <= 2) {
872                 it->ozi_pos = hash;
873                 rc = +1;
874         } else {
875                 it->ozi_pos = 3;
876                 /* to return whether the end has been reached */
877                 rc = osd_index_retrieve_skip_dots(it, za);
878                 if (rc == 0)
879                         rc = +1;
880                 else if (rc == -ENOENT)
881                         rc = 0;
882         }
883
884         RETURN(rc);
885 }
886
887 static struct dt_index_operations osd_dir_ops = {
888         .dio_lookup         = osd_dir_lookup,
889         .dio_declare_insert = osd_declare_dir_insert,
890         .dio_insert         = osd_dir_insert,
891         .dio_declare_delete = osd_declare_dir_delete,
892         .dio_delete         = osd_dir_delete,
893         .dio_it     = {
894                 .init     = osd_dir_it_init,
895                 .fini     = osd_index_it_fini,
896                 .get      = osd_dir_it_get,
897                 .put      = osd_dir_it_put,
898                 .next     = osd_dir_it_next,
899                 .key      = osd_dir_it_key,
900                 .key_size = osd_dir_it_key_size,
901                 .rec      = osd_dir_it_rec,
902                 .store    = osd_dir_it_store,
903                 .load     = osd_dir_it_load
904         }
905 };
906
907 /*
908  * Primitives for index files using binary keys.
909  * XXX: only 64-bit keys are supported for now.
910  */
911
912 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
913                         struct dt_rec *rec, const struct dt_key *key,
914                         struct lustre_capa *capa)
915 {
916         struct osd_object *obj = osd_dt_obj(dt);
917         struct osd_device *osd = osd_obj2dev(obj);
918         int                rc;
919         ENTRY;
920
921         rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
922                                 (const __u64 *)key, 1, 8, obj->oo_recsize,
923                                 (void *)rec);
924         RETURN(rc == 0 ? 1 : rc);
925 }
926
927 static int osd_declare_index_insert(const struct lu_env *env,
928                                     struct dt_object *dt,
929                                     const struct dt_rec *rec,
930                                     const struct dt_key *key,
931                                     struct thandle *th)
932 {
933         struct osd_object  *obj = osd_dt_obj(dt);
934         struct osd_thandle *oh;
935         ENTRY;
936
937         LASSERT(th != NULL);
938         oh = container_of0(th, struct osd_thandle, ot_super);
939
940         LASSERT(obj->oo_db);
941
942         dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object);
943
944         /* It is not clear what API should be used for binary keys, so we pass
945          * a null name which has the side effect of over-reserving space,
946          * accounting for the worst case. See zap_count_write() */
947         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
948
949         RETURN(0);
950 }
951
952 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
953                             const struct dt_rec *rec, const struct dt_key *key,
954                             struct thandle *th, struct lustre_capa *capa,
955                             int ignore_quota)
956 {
957         struct osd_object  *obj = osd_dt_obj(dt);
958         struct osd_device  *osd = osd_obj2dev(obj);
959         struct osd_thandle *oh;
960         int                 rc;
961         ENTRY;
962
963         LASSERT(obj->oo_db);
964         LASSERT(dt_object_exists(dt));
965         LASSERT(osd_invariant(obj));
966         LASSERT(th != NULL);
967
968         oh = container_of0(th, struct osd_thandle, ot_super);
969
970         /* Insert (key,oid) into ZAP */
971         rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
972                              (const __u64 *)key, 1, 8, obj->oo_recsize,
973                              (void *)rec, oh->ot_tx);
974         RETURN(rc);
975 }
976
977 static int osd_declare_index_delete(const struct lu_env *env,
978                                     struct dt_object *dt,
979                                     const struct dt_key *key,
980                                     struct thandle *th)
981 {
982         struct osd_object  *obj = osd_dt_obj(dt);
983         struct osd_thandle *oh;
984         ENTRY;
985
986         LASSERT(dt_object_exists(dt));
987         LASSERT(osd_invariant(obj));
988         LASSERT(th != NULL);
989         LASSERT(obj->oo_db);
990
991         oh = container_of0(th, struct osd_thandle, ot_super);
992         dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL);
993
994         RETURN(0);
995 }
996
997 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
998                             const struct dt_key *key, struct thandle *th,
999                             struct lustre_capa *capa)
1000 {
1001         struct osd_object  *obj = osd_dt_obj(dt);
1002         struct osd_device  *osd = osd_obj2dev(obj);
1003         struct osd_thandle *oh;
1004         int                 rc;
1005         ENTRY;
1006
1007         LASSERT(obj->oo_db);
1008         LASSERT(th != NULL);
1009         oh = container_of0(th, struct osd_thandle, ot_super);
1010
1011         /* Remove binary key from the ZAP */
1012         rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
1013                                 (const __u64 *)key, 1, oh->ot_tx);
1014         RETURN(rc);
1015 }
1016
1017 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1018                             const struct dt_key *key)
1019 {
1020         struct osd_zap_it *it = (struct osd_zap_it *)di;
1021         struct osd_object *obj = it->ozi_obj;
1022         struct osd_device *osd = osd_obj2dev(obj);
1023         ENTRY;
1024
1025         LASSERT(it);
1026         LASSERT(it->ozi_zc);
1027
1028         /* XXX: API is broken at the moment */
1029         LASSERT(*((const __u64 *)key) == 0);
1030
1031         zap_cursor_fini(it->ozi_zc);
1032         memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
1033         zap_cursor_init(it->ozi_zc, osd->od_objset.os, obj->oo_db->db_object);
1034         it->ozi_reset = 1;
1035
1036         RETURN(+1);
1037 }
1038
1039 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1040 {
1041         struct osd_zap_it *it = (struct osd_zap_it *)di;
1042         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1043         int                rc;
1044         ENTRY;
1045
1046         if (it->ozi_reset == 0)
1047                 zap_cursor_advance(it->ozi_zc);
1048         it->ozi_reset = 0;
1049
1050         /*
1051          * According to current API we need to return error if it's last entry.
1052          * zap_cursor_advance() does not return any value. So we need to call
1053          * retrieve to check if there is any record.  We should make
1054          * changes to Iterator API to not return status for this API
1055          */
1056         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1057         if (rc == -ENOENT)
1058                 RETURN(+1);
1059
1060         RETURN((rc));
1061 }
1062
1063 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1064                                        const struct dt_it *di)
1065 {
1066         struct osd_zap_it *it = (struct osd_zap_it *)di;
1067         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1068         int                rc = 0;
1069         ENTRY;
1070
1071         it->ozi_reset = 0;
1072         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1073         if (rc)
1074                 RETURN(ERR_PTR(rc));
1075
1076         /* the binary key is stored in the name */
1077         it->ozi_key = *((__u64 *)za->za_name);
1078
1079         RETURN((struct dt_key *)&it->ozi_key);
1080 }
1081
1082 static int osd_index_it_key_size(const struct lu_env *env,
1083                                 const struct dt_it *di)
1084 {
1085         /* we only support 64-bit binary keys for the time being */
1086         RETURN(sizeof(__u64));
1087 }
1088
1089 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1090                             struct dt_rec *rec, __u32 attr)
1091 {
1092         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1093         struct osd_zap_it *it = (struct osd_zap_it *)di;
1094         struct osd_object *obj = it->ozi_obj;
1095         struct osd_device *osd = osd_obj2dev(obj);
1096         int                rc;
1097         ENTRY;
1098
1099         it->ozi_reset = 0;
1100         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1101         if (rc)
1102                 RETURN(rc);
1103
1104         rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
1105                                 (const __u64 *)za->za_name, 1, 8,
1106                                 obj->oo_recsize, (void *)rec);
1107         RETURN(rc);
1108 }
1109
1110 static __u64 osd_index_it_store(const struct lu_env *env,
1111                                 const struct dt_it *di)
1112 {
1113         struct osd_zap_it *it = (struct osd_zap_it *)di;
1114
1115         it->ozi_reset = 0;
1116         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1117 }
1118
1119 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1120                              __u64 hash)
1121 {
1122         struct osd_zap_it *it = (struct osd_zap_it *)di;
1123         struct osd_object *obj = it->ozi_obj;
1124         struct osd_device *osd = osd_obj2dev(obj);
1125         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1126         int                rc;
1127         ENTRY;
1128
1129         /* close the current cursor */
1130         zap_cursor_fini(it->ozi_zc);
1131
1132         /* create a new one starting at hash */
1133         memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
1134         zap_cursor_init_serialized(it->ozi_zc, osd->od_objset.os,
1135                                    obj->oo_db->db_object, hash);
1136         it->ozi_reset = 0;
1137
1138         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1139         if (rc == 0)
1140                 RETURN(+1);
1141         else if (rc == -ENOENT)
1142                 RETURN(0);
1143
1144         RETURN(rc);
1145 }
1146
1147 static struct dt_index_operations osd_index_ops = {
1148         .dio_lookup             = osd_index_lookup,
1149         .dio_declare_insert     = osd_declare_index_insert,
1150         .dio_insert             = osd_index_insert,
1151         .dio_declare_delete     = osd_declare_index_delete,
1152         .dio_delete             = osd_index_delete,
1153         .dio_it = {
1154                 .init           = osd_index_it_init,
1155                 .fini           = osd_index_it_fini,
1156                 .get            = osd_index_it_get,
1157                 .put            = osd_index_it_put,
1158                 .next           = osd_index_it_next,
1159                 .key            = osd_index_it_key,
1160                 .key_size       = osd_index_it_key_size,
1161                 .rec            = osd_index_it_rec,
1162                 .store          = osd_index_it_store,
1163                 .load           = osd_index_it_load
1164         }
1165 };
1166
1167 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
1168                 const struct dt_index_features *feat)
1169 {
1170         struct osd_object *obj = osd_dt_obj(dt);
1171         ENTRY;
1172
1173         LASSERT(dt_object_exists(dt));
1174
1175         /*
1176          * XXX: implement support for fixed-size keys sorted with natural
1177          *      numerical way (not using internal hash value)
1178          */
1179         if (feat->dif_flags & DT_IND_RANGE)
1180                 RETURN(-ERANGE);
1181
1182         if (unlikely(feat == &dt_otable_features))
1183                 /* do not support oi scrub yet. */
1184                 RETURN(-ENOTSUPP);
1185
1186         LASSERT(obj->oo_db != NULL);
1187         if (likely(feat == &dt_directory_features)) {
1188                 if (udmu_object_is_zap(obj->oo_db))
1189                         dt->do_index_ops = &osd_dir_ops;
1190                 else
1191                         RETURN(-ENOTDIR);
1192         } else if (unlikely(feat == &dt_acct_features)) {
1193                 LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
1194                 dt->do_index_ops = &osd_acct_index_ops;
1195         } else if (udmu_object_is_zap(obj->oo_db) &&
1196                    dt->do_index_ops == NULL) {
1197                 /* For index file, we don't support variable key & record sizes
1198                  * and the key has to be unique */
1199                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
1200                         RETURN(-EINVAL);
1201
1202                 /* Although the zap_*_uint64() primitives support large keys, we
1203                  * limit ourselves to 64-bit keys for now */
1204                 if (feat->dif_keysize_max != sizeof(__u64) ||
1205                     feat->dif_keysize_min != sizeof(__u64))
1206                         RETURN(-EINVAL);
1207
1208                 /* As for the record size, it should be a multiple of 8 bytes
1209                  * and smaller than the maximum value length supported by ZAP.
1210                  */
1211                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
1212                         RETURN(-E2BIG);
1213                 if (feat->dif_recsize_max != feat->dif_recsize_min ||
1214                     (feat->dif_recsize_max & (sizeof(__u64) - 1)))
1215                         RETURN(-EINVAL);
1216
1217                 obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64);
1218                 dt->do_index_ops = &osd_index_ops;
1219         }
1220
1221         RETURN(0);
1222 }
1223