Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2012, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
14  * Author: Mike Pershin <tappro@whamcloud.com>
15  */
16
17 #define DEBUG_SUBSYSTEM S_OSD
18
19 #include <libcfs/libcfs.h>
20 #include <obd_support.h>
21 #include <lustre_net.h>
22 #include <obd.h>
23 #include <obd_class.h>
24 #include <lustre_disk.h>
25 #include <lustre_fid.h>
26
27 #include "osd_internal.h"
28
29 #include <sys/dnode.h>
30 #include <sys/spa.h>
31 #include <sys/stat.h>
32 #include <sys/zap.h>
33 #include <sys/spa_impl.h>
34 #include <sys/zfs_znode.h>
35 #include <sys/dmu_tx.h>
36 #include <sys/dmu_objset.h>
37 #include <sys/dsl_prop.h>
38 #include <sys/sa_impl.h>
39 #include <sys/txg.h>
40 #include <lustre_scrub.h>
41
42 /* We don't actually have direct access to the zap_hashbits() function
43  * so just pretend like we do for now.  If this ever breaks we can look at
44  * it at that time.
45  */
46 #define zap_hashbits(zc) 48
47 /*
48  * ZFS hash format:
49  * | cd (16 bits) | hash (48 bits) |
50  * we need it in other form:
51  * |0| hash (48 bit) | cd (15 bit) |
52  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
53  * the readdir hashes from multiple directory stripes uniformly on the client.
54  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
55  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
56  * should only be the low 15 bits.
57  */
58 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
59 {
60         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
61
62         return (zfs_hash >> zap_hashbits(zc)) |
63                 (zfs_hash << (63 - zap_hashbits(zc)));
64 }
65
66 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
67                                     uint64_t id, uint64_t dirhash)
68 {
69         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
70                 (dirhash >> (63 - zap_hashbits(zc)));
71
72         zap_cursor_init_serialized(zc, os, id, zfs_hash);
73 }
74
75 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
76                         uint64_t id, uint64_t dirhash)
77 {
78         zap_cursor_t *t;
79
80         OBD_ALLOC_PTR(t);
81         if (unlikely(t == NULL))
82                 return -ENOMEM;
83
84         osd_zap_cursor_init_serialized(t, os, id, dirhash);
85         *zc = t;
86
87         return 0;
88 }
89
90 void osd_zap_cursor_fini(zap_cursor_t *zc)
91 {
92         zap_cursor_fini(zc);
93         OBD_FREE_PTR(zc);
94 }
95
96 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
97                                                  struct osd_object *o,
98                                                  uint64_t dirhash)
99 {
100         struct osd_device *d = osd_obj2dev(o);
101
102         osd_zap_cursor_init_serialized(zc, d->od_os,
103                                        o->oo_dn->dn_object, dirhash);
104 }
105
106 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
107                         uint64_t dirhash)
108 {
109         struct osd_device *d = osd_obj2dev(o);
110
111         return osd_zap_cursor_init(zc, d->od_os, o->oo_dn->dn_object, dirhash);
112 }
113
114 static struct dt_it *osd_index_it_init(const struct lu_env *env,
115                                        struct dt_object *dt,
116                                        __u32 unused)
117 {
118         struct osd_thread_info  *info = osd_oti_get(env);
119         struct osd_zap_it       *it;
120         struct osd_object       *obj = osd_dt_obj(dt);
121         struct lu_object        *lo  = &dt->do_lu;
122         int                      rc;
123
124         ENTRY;
125         if (obj->oo_destroyed)
126                 RETURN(ERR_PTR(-ENOENT));
127
128         LASSERT(lu_object_exists(lo));
129         LASSERT(obj->oo_dn);
130         LASSERT(info);
131
132         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
133         if (it == NULL)
134                 RETURN(ERR_PTR(-ENOMEM));
135
136         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
137         if (rc != 0) {
138                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
139                 RETURN(ERR_PTR(rc));
140         }
141
142         it->ozi_obj   = obj;
143         it->ozi_reset = 1;
144 #ifdef ZAP_MAXNAMELEN_NEW
145         it->ozi_za.za_name_len = MAXNAMELEN;
146 #endif
147         lu_object_get(lo);
148
149         RETURN((struct dt_it *)it);
150 }
151
152 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
153 {
154         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
155         struct osd_object       *obj;
156
157         ENTRY;
158         LASSERT(it);
159         LASSERT(it->ozi_obj);
160
161         obj = it->ozi_obj;
162
163         osd_zap_cursor_fini(it->ozi_zc);
164         osd_object_put(env, obj);
165         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
166
167         EXIT;
168 }
169
170
171 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
172 {
173         /* PBS: do nothing : ref are incremented at retrive and decreamented
174          *      next/finish.
175          */
176 }
177
178 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
179                                        int len, __u16 type)
180 {
181         const unsigned int align = sizeof(struct luda_type) - 1;
182         struct luda_type *lt;
183
184         /* check if file type is required */
185         if (attr & LUDA_TYPE) {
186                 len = (len + align) & ~align;
187
188                 lt = (void *)ent->lde_name + len;
189                 lt->lt_type = cpu_to_le16(DTTOIF(type));
190                 ent->lde_attrs |= LUDA_TYPE;
191         }
192
193         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
194 }
195
196 int __osd_xattr_load_by_oid(struct osd_device *osd, uint64_t oid, nvlist_t **sa)
197 {
198         sa_handle_t *hdl;
199         dmu_buf_t *db;
200         int rc;
201
202         rc = -dmu_bonus_hold(osd->od_os, oid, osd_obj_tag, &db);
203         if (rc < 0) {
204                 CERROR("%s: can't get bonus, rc = %d\n", osd->od_svname, rc);
205                 return rc;
206         }
207
208         rc = -sa_handle_get_from_db(osd->od_os, db, NULL, SA_HDL_PRIVATE, &hdl);
209         if (rc) {
210                 dmu_buf_rele(db, osd_obj_tag);
211                 return rc;
212         }
213
214         rc = __osd_xattr_load(osd, hdl, sa);
215
216         sa_handle_destroy(hdl);
217
218         return rc;
219 }
220 /**
221  * Get the object's FID from its LMA EA.
222  *
223  * \param[in] env       pointer to the thread context
224  * \param[in] osd       pointer to the OSD device
225  * \param[in] oid       the object's local identifier
226  * \param[out] fid      the buffer to hold the object's FID
227  *
228  * \retval              0 for success
229  * \retval              negative error number on failure
230  */
231 int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
232                        uint64_t oid, struct lu_fid *fid)
233 {
234         struct objset           *os       = osd->od_os;
235         struct osd_thread_info  *oti      = osd_oti_get(env);
236         struct lustre_mdt_attrs *lma      =
237                         (struct lustre_mdt_attrs *)oti->oti_buf;
238         struct lu_buf            buf;
239         nvlist_t                *sa_xattr = NULL;
240         sa_handle_t             *sa_hdl   = NULL;
241         uchar_t                 *nv_value = NULL;
242         uint64_t                 xattr    = ZFS_NO_OBJECT;
243         int                      size     = 0;
244         int                      rc;
245
246         ENTRY;
247         rc = __osd_xattr_load_by_oid(osd, oid, &sa_xattr);
248         if (rc == -ENOENT)
249                 goto regular;
250
251         if (rc != 0)
252                 GOTO(out, rc);
253
254         rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
255                                        &size);
256         if (rc == -ENOENT)
257                 goto regular;
258
259         if (rc != 0)
260                 GOTO(out, rc);
261
262         if (unlikely(size > sizeof(oti->oti_buf)))
263                 GOTO(out, rc = -ERANGE);
264
265         memcpy(lma, nv_value, size);
266
267         goto found;
268
269 regular:
270         rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
271         if (rc != 0)
272                 GOTO(out, rc);
273
274         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
275         sa_handle_destroy(sa_hdl);
276         if (rc != 0)
277                 GOTO(out, rc);
278
279         buf.lb_buf = lma;
280         buf.lb_len = sizeof(oti->oti_buf);
281         rc = __osd_xattr_get_large(env, osd, xattr, &buf,
282                                    XATTR_NAME_LMA, &size);
283         if (rc != 0)
284                 GOTO(out, rc);
285
286 found:
287         if (size < sizeof(*lma))
288                 GOTO(out, rc = -EIO);
289
290         lustre_lma_swab(lma);
291         if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
292                      CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
293                 rc = -EOPNOTSUPP;
294                 CWARN("%s: unsupported incompat LMA feature(s) %#x for oid = %#llx: rc = %d\n",
295                       osd->od_svname, lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
296                       oid, rc);
297                 GOTO(out, rc);
298         } else {
299                 *fid = lma->lma_self_fid;
300                 GOTO(out, rc = 0);
301         }
302
303 out:
304         if (sa_xattr != NULL)
305                 nvlist_free(sa_xattr);
306         return rc;
307 }
308
309 /*
310  * As we don't know FID, we can't use LU object, so this function
311  * partially duplicate osd_xattr_get_internal() which is built around
312  * LU-object and uses it to cache data like regular EA dnode, etc
313  */
314 static int osd_find_parent_by_dnode(const struct lu_env *env,
315                                     struct dt_object *o,
316                                     struct lu_fid *fid, uint64_t *oid)
317 {
318         struct osd_object       *obj = osd_dt_obj(o);
319         struct osd_device       *osd = osd_obj2dev(obj);
320         uint64_t                 dnode = ZFS_NO_OBJECT;
321         int                      rc;
322
323         ENTRY;
324         /* first of all, get parent dnode from own attributes */
325         rc = osd_sa_handle_get(obj);
326         if (rc != 0)
327                 RETURN(rc);
328         rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
329         if (!rc) {
330                 if (oid)
331                         *oid = dnode;
332                 rc = osd_get_fid_by_oid(env, osd, dnode, fid);
333         }
334
335         RETURN(rc);
336 }
337
338 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
339                                struct lu_fid *fid, uint64_t *oid)
340 {
341         struct link_ea_header  *leh;
342         struct link_ea_entry   *lee;
343         struct lu_buf           buf;
344         int                     rc;
345
346         ENTRY;
347         buf.lb_buf = osd_oti_get(env)->oti_buf;
348         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
349
350         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
351         if (rc == -ERANGE) {
352                 rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
353                 if (rc < 0)
354                         RETURN(rc);
355                 LASSERT(rc > 0);
356                 OBD_ALLOC(buf.lb_buf, rc);
357                 if (buf.lb_buf == NULL)
358                         RETURN(-ENOMEM);
359                 buf.lb_len = rc;
360                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
361         }
362         if (rc < 0)
363                 GOTO(out, rc);
364         if (rc < sizeof(*leh) + sizeof(*lee))
365                 GOTO(out, rc = -EINVAL);
366
367         leh = buf.lb_buf;
368         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
369                 leh->leh_magic = LINK_EA_MAGIC;
370                 leh->leh_reccount = __swab32(leh->leh_reccount);
371                 leh->leh_len = __swab64(leh->leh_len);
372         }
373         if (leh->leh_magic != LINK_EA_MAGIC)
374                 GOTO(out, rc = -EINVAL);
375         if (leh->leh_reccount == 0)
376                 GOTO(out, rc = -ENODATA);
377
378         lee = (struct link_ea_entry *)(leh + 1);
379         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
380         rc = 0;
381
382 out:
383         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
384                 OBD_FREE(buf.lb_buf, buf.lb_len);
385
386 #if 0
387         /* this block can be enabled for additional verification
388          * it's trying to match FID from LinkEA vs. FID from LMA
389          */
390         if (rc == 0) {
391                 struct lu_fid fid2;
392
393                 int rc2;
394
395                 rc2 = osd_find_parent_by_dnode(env, o, &fid2, oid);
396                 if (rc2 == 0)
397                         if (lu_fid_eq(fid, &fid2) == 0)
398                                 CERROR("wrong parent: "DFID" != "DFID"\n",
399                                        PFID(fid), PFID(&fid2));
400         }
401 #endif
402
403         /* no LinkEA is found, let's try to find the fid in parent's LMA */
404         if (unlikely(rc != 0))
405                 rc = osd_find_parent_by_dnode(env, o, fid, oid);
406
407         RETURN(rc);
408 }
409
410 /*
411  * When lookup item under striped directory, we need to locate the master
412  * MDT-object of the striped directory firstly, then the client will send
413  * lookup (getattr_by_name) RPC to the MDT with some slave MDT-object's FID
414  * and the item's name. If the system is restored from MDT file level backup,
415  * then before the OI scrub completely built the OI files, the OI mappings of
416  * the master MDT-object and slave MDT-object may be invalid. Usually, it is
417  * not a problem for the master MDT-object. Because when locate the master
418  * MDT-object, we will do name based lookup (for the striped directory itself)
419  * firstly, during such process we can setup the correct OI mapping for the
420  * master MDT-object. But it will be trouble for the slave MDT-object. Because
421  * the client will not trigger name based lookup on the MDT to locate the slave
422  * MDT-object before locating item under the striped directory, then when
423  * osd_fid_lookup(), it will find that the OI mapping for the slave MDT-object
424  * is invalid and does not know what the right OI mapping is, then the MDT has
425  * to return -EINPROGRESS to the client to notify that the OI scrub is rebuiding
426  * the OI file, related OI mapping is unknown yet, please try again later. And
427  * then client will re-try the RPC again and again until related OI mapping has
428  * been updated. That is quite inefficient.
429  *
430  * To resolve above trouble, we will handle it as the following two cases:
431  *
432  * 1) The slave MDT-object and the master MDT-object are on different MDTs.
433  *    It is relative easy. Be as one of remote MDT-objects, the slave MDT-object
434  *    is linked under /REMOTE_PARENT_DIR with the name of its FID string.
435  *    We can locate the slave MDT-object via lookup the /REMOTE_PARENT_DIR
436  *    directly. Please check osd_fid_lookup().
437  *
438  * 2) The slave MDT-object and the master MDT-object reside on the same MDT.
439  *    Under such case, during lookup the master MDT-object, we will lookup the
440  *    slave MDT-object via readdir against the master MDT-object, because the
441  *    slave MDT-objects information are stored as sub-directories with the name
442  *    "${FID}:${index}". Then when find the local slave MDT-object, its OI
443  *    mapping will be recorded. Then subsequent osd_fid_lookup() will know
444  *    the correct OI mapping for the slave MDT-object.
445  */
446 static int osd_check_lmv(const struct lu_env *env, struct osd_device *osd,
447                          uint64_t oid, const struct lu_fid *fid)
448 {
449         struct osd_thread_info *info = osd_oti_get(env);
450         struct luz_direntry *zde = &info->oti_zde;
451         zap_attribute_t *za = &info->oti_za;
452         zap_cursor_t *zc = &info->oti_zc;
453         struct lu_fid *tfid = &info->oti_fid;
454         nvlist_t *nvbuf = NULL;
455         struct lmv_mds_md_v1 *lmv = NULL;
456         int size;
457         int rc;
458
459         ENTRY;
460         rc = __osd_xattr_load_by_oid(osd, oid, &nvbuf);
461         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
462                 RETURN(0);
463
464         if (rc)
465                 RETURN(rc);
466
467         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMV,
468                                        (uchar_t **)&lmv, &size);
469         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
470                 GOTO(out_nvbuf, rc = 0);
471
472         if (rc)
473                 GOTO(out_nvbuf, rc);
474
475         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
476                 GOTO(out_nvbuf, rc = -EINVAL);
477
478         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
479         rc = -zap_cursor_retrieve(zc, za);
480         if (rc == -ENOENT) {
481                 zap_cursor_advance(zc);
482         } else if (rc) {
483                 CERROR("%s: fail to init for check LMV "DFID"(%llu): rc = %d\n",
484                        osd_name(osd), PFID(fid), oid, rc);
485                 GOTO(out_zc, rc);
486         }
487
488         while (1) {
489                 rc = -zap_cursor_retrieve(zc, za);
490                 if (rc == -ENOENT)
491                         GOTO(out_zc, rc = 0);
492
493                 if (rc) {
494                         CERROR("%s: fail to locate next for check LMV "DFID"(%llu): rc = %d\n",
495                                osd_name(osd), PFID(fid), oid, rc);
496                         GOTO(out_zc, rc);
497                 }
498
499                 fid_zero(tfid);
500                 sscanf(za->za_name + 1, SFID, RFID(tfid));
501                 if (fid_is_sane(tfid) && !osd_remote_fid(env, osd, tfid)) {
502                         rc = osd_zap_lookup(osd, oid, NULL, za->za_name,
503                                         za->za_integer_length,
504                                         sizeof(*zde) / za->za_integer_length,
505                                         (void *)zde);
506                         if (rc) {
507                                 CERROR("%s: fail to lookup for check LMV "DFID"(%llu): rc = %d\n",
508                                        osd_name(osd), PFID(fid), oid, rc);
509                                 GOTO(out_zc, rc);
510                         }
511
512                         rc = osd_oii_insert(env, osd, tfid,
513                                             zde->lzd_reg.zde_dnode, false);
514                         GOTO(out_zc, rc);
515                 }
516
517                 zap_cursor_advance(zc);
518         }
519
520 out_zc:
521         zap_cursor_fini(zc);
522 out_nvbuf:
523         nvlist_free(nvbuf);
524
525         return rc;
526 }
527
528 static void
529 osd_zfs_consistency_check(const struct lu_env *env, struct osd_device *osd,
530                           struct osd_object *obj, const struct lu_fid *fid,
531                           u64 oid, bool is_dir)
532 {
533         struct lustre_scrub *scrub = &osd->od_scrub;
534         dnode_t *dn = NULL;
535         uint64_t oid2;
536         int once = 0;
537         bool insert;
538         int rc;
539
540         ENTRY;
541         /* oid == ZFS_NO_OBJECT must be for lookup ".." case */
542         if (oid == ZFS_NO_OBJECT) {
543                 rc = osd_sa_handle_get(obj);
544                 if (rc)
545                         return;
546
547                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PARENT(osd), &oid, 8);
548                 if (rc)
549                         return;
550         }
551
552         if (!scrub_needs_check(&osd->od_scrub, fid, oid))
553                 return;
554 again:
555         rc = osd_fid_lookup(env, osd, fid, &oid2);
556         if (rc == -ENOENT) {
557                 insert = true;
558                 if (dn)
559                         goto trigger;
560
561                 rc = __osd_obj2dnode(osd->od_os, oid, &dn);
562                 /* The object has been removed (by race maybe). */
563                 if (rc)
564                         return;
565
566                 goto trigger;
567         } else if (rc || oid == oid2) {
568                 GOTO(out, rc);
569         }
570
571         insert = false;
572
573 trigger:
574         if (scrub->os_running) {
575                 if (!dn) {
576                         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
577                         /* The object has been removed (by race maybe). */
578                         if (rc)
579                                 return;
580                 }
581
582                 rc = osd_oii_insert(env, osd, fid, oid, insert);
583                 /* There is race condition between osd_oi_lookup and OI scrub.
584                  * The OI scrub finished just after osd_oi_lookup() failure.
585                  * Under such case, it is unnecessary to trigger OI scrub again,
586                  * but try to call osd_oi_lookup() again.
587                  */
588                 if (unlikely(rc == -EAGAIN))
589                         goto again;
590
591                 if (is_dir)
592                         rc = osd_check_lmv(env, osd, oid, fid);
593                 else
594                         rc = 0;
595
596                 GOTO(out, rc);
597         }
598
599         if (osd->od_scrub.os_auto_scrub_interval != AS_NEVER && ++once == 1) {
600                 rc = osd_scrub_start(env, osd, SS_AUTO_FULL |
601                                      SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT);
602                 CDEBUG_LIMIT(D_LFSCK | D_CONSOLE | D_WARNING,
603                              "%s: trigger partial OI scrub for RPC inconsistency, checking FID "DFID"/%#llx): rc = %d\n",
604                              osd_name(osd), PFID(fid), oid, rc);
605                 if (!rc)
606                         goto again;
607         }
608
609         GOTO(out, rc);
610
611 out:
612         if (dn)
613                 osd_dnode_rele(dn);
614 }
615
616 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
617                           struct dt_rec *rec, const struct dt_key *key)
618 {
619         struct osd_thread_info *oti = osd_oti_get(env);
620         struct osd_object *obj = osd_dt_obj(dt);
621         struct osd_device *osd = osd_obj2dev(obj);
622         struct lu_fid *fid = (struct lu_fid *)rec;
623         char *name = (char *)key;
624         uint64_t oid = ZFS_NO_OBJECT;
625         int rc;
626
627         ENTRY;
628         if (name[0] == '.') {
629                 if (name[1] == 0) {
630                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
631
632                         memcpy(rec, f, sizeof(*f));
633                         RETURN(1);
634                 } else if (name[1] == '.' && name[2] == 0) {
635                         rc = osd_find_parent_fid(env, dt, fid, &oid);
636                         GOTO(out, rc);
637                 }
638         }
639
640         memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
641
642         down_read(&obj->oo_guard);
643         if (obj->oo_destroyed)
644                 GOTO(out_unlock, rc = -ENOENT);
645
646         rc = osd_zap_lookup(osd, obj->oo_dn->dn_object, obj->oo_dn,
647                             (char *)key, 8, sizeof(oti->oti_zde) / 8,
648                             (void *)&oti->oti_zde);
649         if (rc != 0) {
650                 up_read(&obj->oo_guard);
651                 RETURN(rc);
652         }
653
654         oid = oti->oti_zde.lzd_reg.zde_dnode;
655         if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
656                 memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
657                 GOTO(out_unlock, rc = 0);
658         }
659
660         rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode, fid);
661
662 out_unlock:
663         up_read(&obj->oo_guard);
664 out:
665         if (!rc && !osd_remote_fid(env, osd, fid)) {
666                 bool is_dir = S_ISDIR(DTTOIF(oti->oti_zde.lzd_reg.zde_type));
667
668                 /*
669                  * this should ask the scrubber to check OI given
670                  * the mapping we just found in the dir entry.
671                  * but result of that check should not affect
672                  * result of the lookup in the directory.
673                  * otherwise such a direntry becomes hidden
674                  * from the layers above, including LFSCK which
675                  * is supposed to fix dangling entries.
676                  */
677                 osd_zfs_consistency_check(env, osd, obj, fid, oid, is_dir);
678         }
679
680         return rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc);
681 }
682
683 /*
684  * In DNE environment, the object and its name entry may reside on different
685  * MDTs. Under such case, we will create an agent object on the MDT where the
686  * name entry resides. The agent object is empty, and indicates that the real
687  * object for the name entry resides on another MDT. If without agent object,
688  * related name entry will be skipped when perform MDT side file level backup
689  * and restore via ZPL by userspace tool, such as 'tar'.
690  */
691 static int osd_create_agent_object(const struct lu_env *env,
692                                    struct osd_device *osd,
693                                    struct luz_direntry *zde,
694                                    uint64_t parent, dmu_tx_t *tx)
695 {
696         struct osd_thread_info *info = osd_oti_get(env);
697         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
698         struct lu_attr *la = &info->oti_la;
699         nvlist_t *nvbuf = NULL;
700         dnode_t *dn = NULL;
701         sa_handle_t *hdl;
702         int rc = 0;
703
704         ENTRY;
705         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTOBJ))
706                 RETURN(0);
707
708         rc = -nvlist_alloc(&nvbuf, NV_UNIQUE_NAME, KM_SLEEP);
709         if (rc)
710                 RETURN(rc);
711
712         lustre_lma_init(lma, &zde->lzd_fid, 0, LMAI_AGENT);
713         lustre_lma_swab(lma);
714         rc = -nvlist_add_byte_array(nvbuf, XATTR_NAME_LMA, (uchar_t *)lma,
715                                     sizeof(*lma));
716         if (rc)
717                 GOTO(out, rc);
718
719         la->la_valid = LA_TYPE | LA_MODE;
720         la->la_mode = (DTTOIF(zde->lzd_reg.zde_type) & S_IFMT) | 0755;
721
722         if (S_ISDIR(la->la_mode))
723                 rc = __osd_zap_create(env, osd, &dn, tx, la,
724                                 osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS), 0);
725         else
726                 rc = __osd_object_create(env, osd, NULL, &zde->lzd_fid,
727                                          &dn, tx, la);
728         if (rc)
729                 GOTO(out, rc);
730
731         zde->lzd_reg.zde_dnode = dn->dn_object;
732         rc = -sa_handle_get(osd->od_os, dn->dn_object, NULL,
733                             SA_HDL_PRIVATE, &hdl);
734         if (!rc) {
735                 rc = __osd_attr_init(env, osd, NULL, hdl, tx,
736                                      la, parent, nvbuf);
737                 sa_handle_destroy(hdl);
738         }
739
740         GOTO(out, rc);
741
742 out:
743         if (dn) {
744                 if (rc)
745                         dmu_object_free(osd->od_os, dn->dn_object, tx);
746                 osd_dnode_rele(dn);
747         }
748
749         if (nvbuf)
750                 nvlist_free(nvbuf);
751
752         return rc;
753 }
754
755 int osd_add_to_remote_parent(const struct lu_env *env,
756                              struct osd_device *osd,
757                              struct osd_object *obj,
758                              struct osd_thandle *oh)
759 {
760         struct osd_thread_info *info = osd_oti_get(env);
761         struct luz_direntry *zde = &info->oti_zde;
762         char *name = info->oti_str;
763         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
764         struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
765         struct lu_buf buf = {
766                 .lb_buf = lma,
767                 .lb_len = sizeof(info->oti_buf),
768         };
769         int size = 0;
770         int rc;
771
772         ENTRY;
773         if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTENT))
774                 RETURN(0);
775
776         rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
777         if (rc) {
778                 CWARN("%s: fail to load LMA for adding "DFID" to remote parent: rc = %d\n",
779                       osd_name(osd), PFID(fid), rc);
780                 RETURN(rc);
781         }
782
783         lustre_lma_swab(lma);
784         lma->lma_incompat |= LMAI_REMOTE_PARENT;
785         lustre_lma_swab(lma);
786         buf.lb_len = size;
787         rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
788                                     LU_XATTR_REPLACE, oh);
789         if (rc) {
790                 CWARN("%s: fail to update LMA for adding "DFID" to remote parent: rc = %d\n",
791                       osd_name(osd), PFID(fid), rc);
792                 RETURN(rc);
793         }
794
795         osd_fid2str(name, fid, sizeof(info->oti_str));
796         zde->lzd_reg.zde_dnode = obj->oo_dn->dn_object;
797         zde->lzd_reg.zde_type = S_DT(S_IFDIR);
798         zde->lzd_fid = *fid;
799
800         rc = osd_zap_add(osd, osd->od_remote_parent_dir, NULL,
801                          name, 8, sizeof(*zde) / 8, zde, oh->ot_tx);
802         if (unlikely(rc == -EEXIST))
803                 rc = 0;
804         if (rc)
805                 CWARN("%s: fail to add name entry for "
806                       DFID" to remote parent: rc = %d\n",
807                       osd_name(osd), PFID(fid), rc);
808         else
809                 lu_object_set_agent_entry(&obj->oo_dt.do_lu);
810
811         RETURN(rc);
812 }
813
814 int osd_delete_from_remote_parent(const struct lu_env *env,
815                                   struct osd_device *osd,
816                                   struct osd_object *obj,
817                                   struct osd_thandle *oh, bool destroy)
818 {
819         struct osd_thread_info *info = osd_oti_get(env);
820         char *name = info->oti_str;
821         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
822         struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
823         struct lu_buf buf = {
824                 .lb_buf = lma,
825                 .lb_len = sizeof(info->oti_buf),
826         };
827         int size = 0;
828         int rc;
829
830         ENTRY;
831         osd_fid2str(name, fid, sizeof(info->oti_str));
832         rc = osd_zap_remove(osd, osd->od_remote_parent_dir, NULL,
833                             name, oh->ot_tx);
834         if (unlikely(rc == -ENOENT))
835                 rc = 0;
836         if (rc)
837                 CERROR("%s: fail to remove entry under remote parent for "DFID": rc = %d\n",
838                        osd_name(osd), PFID(fid), rc);
839
840         if (destroy || rc)
841                 RETURN(rc);
842
843         rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
844         if (rc) {
845                 CERROR("%s: fail to load LMA for removing "DFID" from remote parent: rc = %d\n",
846                        osd_name(osd), PFID(fid), rc);
847                 RETURN(rc);
848         }
849
850         lustre_lma_swab(lma);
851         lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
852         lustre_lma_swab(lma);
853         buf.lb_len = size;
854         rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
855                                     LU_XATTR_REPLACE, oh);
856         if (rc)
857                 CERROR("%s: fail to update LMA for removing "DFID" from remote parent: rc = %d\n",
858                        osd_name(osd), PFID(fid), rc);
859         else
860                 lu_object_clear_agent_entry(&obj->oo_dt.do_lu);
861
862         RETURN(rc);
863 }
864
865 static int osd_declare_dir_insert(const struct lu_env *env,
866                                   struct dt_object *dt,
867                                   const struct dt_rec *rec,
868                                   const struct dt_key *key,
869                                   struct thandle *th)
870 {
871         struct osd_object       *obj = osd_dt_obj(dt);
872         struct osd_device       *osd = osd_obj2dev(obj);
873         const struct dt_insert_rec *rec1;
874         const struct lu_fid     *fid;
875         struct osd_thandle      *oh;
876         uint64_t                 object;
877         struct osd_idmap_cache *idc;
878
879         ENTRY;
880         rec1 = (struct dt_insert_rec *)rec;
881         fid = rec1->rec_fid;
882         LASSERT(fid != NULL);
883         LASSERT(rec1->rec_type != 0);
884
885         LASSERT(th != NULL);
886         oh = container_of(th, struct osd_thandle, ot_super);
887
888         idc = osd_idc_find_or_init(env, osd, fid);
889         if (IS_ERR(idc))
890                 RETURN(PTR_ERR(idc));
891
892         if (idc->oic_remote) {
893                 const char *name = (const char *)key;
894
895                 if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
896                         /* Prepare agent object for remote entry that will
897                          * be used for operations via ZPL, such as MDT side
898                          * file-level backup and restore.
899                          */
900                         dmu_tx_hold_sa_create(oh->ot_tx,
901                                 osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
902                         if (S_ISDIR(rec1->rec_type))
903                                 dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT,
904                                                 FALSE, NULL);
905                 }
906         }
907
908         /* This is for inserting dot/dotdot for new created dir. */
909         if (obj->oo_dn == NULL)
910                 object = DMU_NEW_OBJECT;
911         else
912                 object = obj->oo_dn->dn_object;
913
914         /* do not specify the key as then DMU is trying to look it up
915          * which is very expensive. usually the layers above lookup
916          * before insertion
917          */
918         osd_tx_hold_zap(oh->ot_tx, object, obj->oo_dn, TRUE, NULL);
919
920         RETURN(0);
921 }
922
923 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
924                           u64 seq)
925 {
926         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
927         struct seq_server_site  *ss = osd_seq_site(osd);
928         int                     rc;
929
930         ENTRY;
931         LASSERT(ss != NULL);
932         LASSERT(ss->ss_server_fld != NULL);
933
934         rc = osd_fld_lookup(env, osd, seq, range);
935         if (rc != 0) {
936                 if (rc != -ENOENT)
937                         CERROR("%s: Can not lookup fld for %#llx: rc = %d\n",
938                                osd_name(osd), seq, rc);
939                 RETURN(0);
940         }
941
942         RETURN(ss->ss_node_id == range->lsr_index);
943 }
944
945 int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
946                    const struct lu_fid *fid)
947 {
948         struct seq_server_site  *ss = osd_seq_site(osd);
949
950         ENTRY;
951         /* FID seqs not in FLDB, must be local seq */
952         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
953                 RETURN(0);
954
955         /* If FLD is not being initialized yet, it only happens during the
956          * initialization, likely during mgs initialization, and we assume
957          * this is local FID.
958          */
959         if (ss == NULL || ss->ss_server_fld == NULL)
960                 RETURN(0);
961
962         /* Only check the local FLDB here */
963         if (osd_seq_exists(env, osd, fid_seq(fid)))
964                 RETURN(0);
965
966         RETURN(1);
967 }
968
969 /**
970  *      Inserts (key, value) pair in \a directory object.
971  *
972  *      \param  dt      osd index object
973  *      \param  key     key for index
974  *      \param  rec     record reference
975  *      \param  th      transaction handler
976  *
977  *      \retval  0  success
978  *      \retval -ve failure
979  */
980 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
981                           const struct dt_rec *rec, const struct dt_key *key,
982                           struct thandle *th)
983 {
984         struct osd_thread_info *oti = osd_oti_get(env);
985         struct osd_object   *parent = osd_dt_obj(dt);
986         struct osd_device   *osd = osd_obj2dev(parent);
987         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
988         const struct lu_fid *fid = rec1->rec_fid;
989         struct osd_thandle *oh;
990         struct osd_idmap_cache *idc;
991         const char *name = (const char *)key;
992         struct luz_direntry *zde = &oti->oti_zde;
993         int num = sizeof(*zde) / 8;
994         int rc;
995
996         ENTRY;
997         LASSERT(parent->oo_dn);
998
999         LASSERT(dt_object_exists(dt));
1000         LASSERT(osd_invariant(parent));
1001
1002         LASSERT(th != NULL);
1003         oh = container_of(th, struct osd_thandle, ot_super);
1004
1005         idc = osd_idc_find(env, osd, fid);
1006         if (unlikely(idc == NULL)) {
1007                 /* this dt_insert() wasn't declared properly, so
1008                  * FID is missing in OI cache. we better do not
1009                  * lookup FID in FLDB/OI and don't risk to deadlock,
1010                  * but in some special cases (lfsck testing, etc)
1011                  * it's much simpler than fixing a caller
1012                  */
1013                 idc = osd_idc_find_or_init(env, osd, fid);
1014                 if (IS_ERR(idc)) {
1015                         CERROR("%s: "DFID" wasn't declared for insert\n",
1016                                osd_name(osd), PFID(fid));
1017                         RETURN(PTR_ERR(idc));
1018                 }
1019         }
1020
1021         BUILD_BUG_ON(sizeof(zde->lzd_reg) != 8);
1022         BUILD_BUG_ON(sizeof(*zde) % 8 != 0);
1023
1024         memset(&zde->lzd_reg, 0, sizeof(zde->lzd_reg));
1025         zde->lzd_reg.zde_type = S_DT(rec1->rec_type & S_IFMT);
1026         zde->lzd_fid = *fid;
1027
1028         if (idc->oic_remote) {
1029                 if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
1030                         /* Create agent inode for remote object that will
1031                          * be used for MDT file-level backup and restore.
1032                          */
1033                         rc = osd_create_agent_object(env, osd, zde,
1034                                         parent->oo_dn->dn_object, oh->ot_tx);
1035                         if (rc) {
1036                                 CWARN("%s: Fail to create agent object for "DFID": rc = %d\n",
1037                                       osd_name(osd), PFID(fid), rc);
1038                                 /* Ignore the failure since the system can go
1039                                  * ahead if we do not care about the MDT side
1040                                  * file-level backup and restore.
1041                                  */
1042                                 rc = 0;
1043                         }
1044                 }
1045         } else {
1046                 if (unlikely(idc->oic_dnode == 0)) {
1047                         /* for a reason OI cache wasn't filled properly */
1048                         CERROR("%s: OIC for "DFID" isn't filled\n",
1049                                osd_name(osd), PFID(fid));
1050                         RETURN(-EINVAL);
1051                 }
1052                 if (name[0] == '.') {
1053                         if (name[1] == 0) {
1054                                 /* do not store ".", instead generate it
1055                                  * during iteration
1056                                  */
1057                                 GOTO(out, rc = 0);
1058                         } else if (name[1] == '.' && name[2] == 0) {
1059                                 uint64_t dnode = idc->oic_dnode;
1060
1061                                 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT))
1062                                         dnode--;
1063
1064                                 /* update parent dnode in the child.
1065                                  * later it will be used to generate ".."
1066                                  */
1067                                 rc = osd_object_sa_update(parent,
1068                                                  SA_ZPL_PARENT(osd),
1069                                                  &dnode, 8, oh);
1070
1071                                 GOTO(out, rc);
1072                         }
1073                 }
1074                 zde->lzd_reg.zde_dnode = idc->oic_dnode;
1075         }
1076
1077         if (CFS_FAIL_CHECK(OBD_FAIL_FID_INDIR))
1078                 zde->lzd_fid.f_ver = ~0;
1079
1080         /* The logic is not related with IGIF, just re-use the fail_loc value
1081          * to be consistent with ldiskfs case, then share the same test logic
1082          */
1083         if (CFS_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1084                 num = 1;
1085
1086         /* Insert (key,oid) into ZAP */
1087         rc = osd_zap_add(osd, parent->oo_dn->dn_object, parent->oo_dn,
1088                          name, 8, num, (void *)zde, oh->ot_tx);
1089         if (unlikely(rc == -EEXIST &&
1090                      name[0] == '.' && name[1] == '.' && name[2] == 0))
1091                 /* Update (key,oid) in ZAP */
1092                 rc = -zap_update(osd->od_os, parent->oo_dn->dn_object, name, 8,
1093                                  sizeof(*zde) / 8, (void *)zde, oh->ot_tx);
1094
1095 out:
1096
1097         RETURN(rc);
1098 }
1099
1100 static int osd_declare_dir_delete(const struct lu_env *env,
1101                                   struct dt_object *dt,
1102                                   const struct dt_key *key,
1103                                   struct thandle *th)
1104 {
1105         struct osd_object *obj = osd_dt_obj(dt);
1106         dnode_t *zap_dn = obj->oo_dn;
1107         struct osd_thandle *oh;
1108         const char *name = (const char *)key;
1109
1110         ENTRY;
1111         LASSERT(dt_object_exists(dt));
1112         LASSERT(osd_invariant(obj));
1113         LASSERT(zap_dn != NULL);
1114
1115         LASSERT(th != NULL);
1116         oh = container_of(th, struct osd_thandle, ot_super);
1117
1118         /*
1119          * In Orion . and .. were stored in the directory (not generated upon
1120          * request as now). We preserve them for backward compatibility.
1121          */
1122         if (name[0] == '.') {
1123                 if (name[1] == 0)
1124                         RETURN(0);
1125                 else if (name[1] == '.' && name[2] == 0)
1126                         RETURN(0);
1127         }
1128
1129         /* do not specify the key as then DMU is trying to look it up
1130          * which is very expensive. usually the layers above lookup
1131          * before deletion
1132          */
1133         osd_tx_hold_zap(oh->ot_tx, zap_dn->dn_object, zap_dn, FALSE, NULL);
1134
1135         /* For destroying agent object if have. */
1136         dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
1137
1138         RETURN(0);
1139 }
1140
1141 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
1142                           const struct dt_key *key, struct thandle *th)
1143 {
1144         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1145         struct osd_object *obj = osd_dt_obj(dt);
1146         struct osd_device *osd = osd_obj2dev(obj);
1147         struct osd_thandle *oh;
1148         dnode_t *zap_dn = obj->oo_dn;
1149         char      *name = (char *)key;
1150         int rc;
1151
1152         ENTRY;
1153         LASSERT(zap_dn);
1154
1155         LASSERT(th != NULL);
1156         oh = container_of(th, struct osd_thandle, ot_super);
1157
1158         /*
1159          * In Orion . and .. were stored in the directory (not generated upon
1160          * request as now). we preserve them for backward compatibility
1161          */
1162         if (name[0] == '.') {
1163                 if (name[1] == 0)
1164                         RETURN(0);
1165                 else if (name[1] == '.' && name[2] == 0)
1166                         RETURN(0);
1167         }
1168
1169         /* XXX: We have to say that lookup during delete_declare will affect
1170          *      performance, but we have to check whether the name entry (to
1171          *      be deleted) has agent object or not to avoid orphans.
1172          *
1173          *      We will improve that in the future, some possible solutions,
1174          *      for example:
1175          *      1) Some hint from the caller via transaction handle to make
1176          *         the lookup conditionally.
1177          *      2) Enhance the ZFS logic to recognize the OSD lookup result
1178          *         and delete the given entry directly without lookup again
1179          *         internally. LU-10190
1180          */
1181         memset(&zde->lzd_fid, 0, sizeof(zde->lzd_fid));
1182         rc = osd_zap_lookup(osd, zap_dn->dn_object, zap_dn, name, 8, 3, zde);
1183         if (unlikely(rc)) {
1184                 if (rc != -ENOENT)
1185                         CERROR("%s: failed to locate entry  %s: rc = %d\n",
1186                                osd->od_svname, name, rc);
1187                 RETURN(rc);
1188         }
1189
1190         if (unlikely(osd_remote_fid(env, osd, &zde->lzd_fid) > 0)) {
1191                 rc = -dmu_object_free(osd->od_os, zde->lzd_reg.zde_dnode,
1192                                       oh->ot_tx);
1193                 if (rc)
1194                         CERROR("%s: failed to destroy agent object (%llu) for the entry %s: rc = %d\n",
1195                                osd->od_svname, (__u64)zde->lzd_reg.zde_dnode,
1196                                name, rc);
1197         }
1198
1199         /* Remove key from the ZAP */
1200         rc = osd_zap_remove(osd, zap_dn->dn_object, zap_dn,
1201                             (char *)key, oh->ot_tx);
1202         if (unlikely(rc))
1203                 CERROR("%s: zap_remove %s failed: rc = %d\n",
1204                        osd->od_svname, name, rc);
1205
1206         RETURN(rc);
1207 }
1208
1209 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
1210                                      struct dt_object *dt,
1211                                      __u32 unused)
1212 {
1213         struct osd_zap_it *it;
1214
1215         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
1216         if (!IS_ERR(it))
1217                 it->ozi_pos = OZI_POS_INIT;
1218
1219         RETURN((struct dt_it *)it);
1220 }
1221
1222 /**
1223  *  Move Iterator to record specified by \a key
1224  *
1225  *  \param  di      osd iterator
1226  *  \param  key     key for index
1227  *
1228  *  \retval +ve  di points to record with least key not larger than key
1229  *  \retval  0   di points to exact matched key
1230  *  \retval -ve  failure
1231  */
1232 static int osd_dir_it_get(const struct lu_env *env,
1233                           struct dt_it *di, const struct dt_key *key)
1234 {
1235         struct osd_zap_it *it = (struct osd_zap_it *)di;
1236         struct osd_object *obj = it->ozi_obj;
1237         char              *name = (char *)key;
1238         int                rc;
1239
1240         ENTRY;
1241         LASSERT(it);
1242         LASSERT(it->ozi_zc);
1243
1244         /* reset the cursor */
1245         zap_cursor_fini(it->ozi_zc);
1246         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
1247
1248         /* XXX: implementation of the API is broken at the moment */
1249         LASSERT(((const char *)key)[0] == 0);
1250
1251         if (name[0] == 0) {
1252                 it->ozi_pos = OZI_POS_INIT;
1253                 RETURN(1);
1254         }
1255
1256         if (name[0] == '.') {
1257                 if (name[1] == 0) {
1258                         it->ozi_pos = OZI_POS_DOT;
1259                         GOTO(out, rc = 1);
1260                 } else if (name[1] == '.' && name[2] == 0) {
1261                         it->ozi_pos = OZI_POS_DOTDOT;
1262                         GOTO(out, rc = 1);
1263                 }
1264         }
1265
1266         /* neither . nor .. - some real record */
1267         it->ozi_pos = OZI_POS_REAL;
1268         rc = 1;
1269
1270 out:
1271         RETURN(rc);
1272 }
1273
1274 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
1275 {
1276         /* PBS: do nothing : ref are incremented at retrive and decreamented
1277          *      next/finish.
1278          */
1279 }
1280
1281 /*
1282  * in Orion . and .. were stored in the directory, while ZPL
1283  * and current osd-zfs generate them up on request. so, we
1284  * need to ignore previously stored . and ..
1285  */
1286 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
1287                                         zap_attribute_t *za)
1288 {
1289         int rc, isdot;
1290
1291         do {
1292                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1293
1294                 isdot = 0;
1295                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
1296                         if (za->za_name[1] == 0) {
1297                                 isdot = 1;
1298                         } else if (za->za_name[1] == '.' &&
1299                                    za->za_name[2] == 0) {
1300                                 isdot = 1;
1301                         }
1302                         if (unlikely(isdot))
1303                                 zap_cursor_advance(it->ozi_zc);
1304                 }
1305         } while (unlikely(rc == 0 && isdot));
1306
1307         return rc;
1308 }
1309
1310 /**
1311  * to load a directory entry at a time and stored it in
1312  * iterator's in-memory data structure.
1313  *
1314  * \param di, struct osd_it_ea, iterator's in memory structure
1315  *
1316  * \retval +ve, iterator reached to end
1317  * \retval   0, iterator not reached to end
1318  * \retval -ve, on error
1319  */
1320 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
1321 {
1322         struct osd_zap_it *it = (struct osd_zap_it *)di;
1323         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1324         int                rc;
1325
1326
1327         ENTRY;
1328         /* temp. storage should be enough for any key supported by ZFS */
1329 #ifdef ZAP_MAXNAMELEN_NEW
1330         LASSERT(za->za_name_len <= sizeof(it->ozi_name));
1331 #else
1332         BUILD_BUG_ON(sizeof(za->za_name) > sizeof(it->ozi_name));
1333 #endif
1334
1335         /*
1336          * the first ->next() moves the cursor to .
1337          * the second ->next() moves the cursor to ..
1338          * then we get to the real records and have to verify any exist
1339          */
1340         if (it->ozi_pos <= OZI_POS_DOTDOT) {
1341                 it->ozi_pos++;
1342                 if (it->ozi_pos <= OZI_POS_DOTDOT)
1343                         RETURN(0);
1344         } else {
1345                 zap_cursor_advance(it->ozi_zc);
1346         }
1347
1348         /*
1349          * According to current API we need to return error if its last entry.
1350          * zap_cursor_advance() does not return any value. So we need to call
1351          * retrieve to check if there is any record.  We should make
1352          * changes to Iterator API to not return status for this API
1353          */
1354         rc = osd_index_retrieve_skip_dots(it, za);
1355
1356         if (rc == -ENOENT) /* end of dir */
1357                 RETURN(+1);
1358
1359         RETURN(rc);
1360 }
1361
1362 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
1363                                      const struct dt_it *di)
1364 {
1365         struct osd_zap_it *it = (struct osd_zap_it *)di;
1366         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1367         int                rc = 0;
1368
1369         ENTRY;
1370         if (it->ozi_pos <= OZI_POS_DOT) {
1371                 it->ozi_pos = OZI_POS_DOT;
1372                 RETURN((struct dt_key *)".");
1373         } else if (it->ozi_pos == OZI_POS_DOTDOT) {
1374                 RETURN((struct dt_key *)"..");
1375         }
1376
1377         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1378         if (rc)
1379                 RETURN(ERR_PTR(rc));
1380
1381         strcpy(it->ozi_name, za->za_name);
1382
1383         RETURN((struct dt_key *)it->ozi_name);
1384 }
1385
1386 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
1387 {
1388         struct osd_zap_it *it = (struct osd_zap_it *)di;
1389         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1390         int                rc;
1391
1392         ENTRY;
1393         if (it->ozi_pos <= OZI_POS_DOT) {
1394                 it->ozi_pos = OZI_POS_DOT;
1395                 RETURN(2);
1396         } else if (it->ozi_pos == OZI_POS_DOTDOT) {
1397                 RETURN(3);
1398         }
1399
1400         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1401         if (rc == 0)
1402                 rc = strlen(za->za_name);
1403
1404         RETURN(rc);
1405 }
1406
1407 static int
1408 osd_dirent_update(const struct lu_env *env, struct osd_device *dev,
1409                   uint64_t zap, const char *key, struct luz_direntry *zde)
1410 {
1411         dmu_tx_t *tx;
1412         int rc;
1413
1414         ENTRY;
1415         tx = dmu_tx_create(dev->od_os);
1416         if (!tx)
1417                 RETURN(-ENOMEM);
1418
1419         dmu_tx_hold_zap(tx, zap, TRUE, NULL);
1420         rc = -dmu_tx_assign(tx, DMU_TX_WAIT);
1421         if (!rc)
1422                 rc = -zap_update(dev->od_os, zap, key, 8, sizeof(*zde) / 8,
1423                                  (const void *)zde, tx);
1424         if (rc)
1425                 dmu_tx_abort(tx);
1426         else
1427                 dmu_tx_commit(tx);
1428
1429         RETURN(rc);
1430 }
1431
1432 static int osd_update_entry_for_agent(const struct lu_env *env,
1433                                       struct osd_device *osd,
1434                                       uint64_t zap, const char *name,
1435                                       struct luz_direntry *zde, __u32 attr)
1436 {
1437         dmu_tx_t *tx = NULL;
1438         int rc = 0;
1439
1440         ENTRY;
1441         if (attr & LUDA_VERIFY_DRYRUN)
1442                 GOTO(out, rc = 0);
1443
1444         tx = dmu_tx_create(osd->od_os);
1445         if (!tx)
1446                 GOTO(out, rc = -ENOMEM);
1447
1448         dmu_tx_hold_sa_create(tx, osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
1449         dmu_tx_hold_zap(tx, zap, FALSE, NULL);
1450         rc = -dmu_tx_assign(tx, DMU_TX_WAIT);
1451         if (rc) {
1452                 dmu_tx_abort(tx);
1453                 GOTO(out, rc);
1454         }
1455
1456         rc = osd_create_agent_object(env, osd, zde, zap, tx);
1457         if (!rc)
1458                 rc = -zap_update(osd->od_os, zap, name, 8, sizeof(*zde) / 8,
1459                                  (const void *)zde, tx);
1460         dmu_tx_commit(tx);
1461
1462         GOTO(out, rc);
1463
1464 out:
1465         CDEBUG(D_LFSCK, "%s: Updated (%s) remote entry for "DFID": rc = %d\n",
1466                osd_name(osd), (attr & LUDA_VERIFY_DRYRUN) ? "(ro)" : "(rw)",
1467                PFID(&zde->lzd_fid), rc);
1468         return rc;
1469 }
1470
1471 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
1472                           struct dt_rec *dtrec, __u32 attr)
1473 {
1474         struct osd_zap_it *it = (struct osd_zap_it *)di;
1475         struct lu_dirent *lde = (struct lu_dirent *)dtrec;
1476         struct osd_thread_info *info = osd_oti_get(env);
1477         struct luz_direntry *zde = &info->oti_zde;
1478         zap_attribute_t *za = &info->oti_za;
1479         struct lu_fid *fid = &info->oti_fid;
1480         struct osd_device *osd = osd_obj2dev(it->ozi_obj);
1481         int rc, namelen;
1482
1483         ENTRY;
1484         lde->lde_attrs = 0;
1485         if (it->ozi_pos <= OZI_POS_DOT) {
1486                 /* notice hash=0 here, this is needed to avoid
1487                  * case when some real entry (after ./..) may
1488                  * have hash=0. in this case the client would
1489                  * be confused having records out of hash order.
1490                  */
1491                 lde->lde_hash = cpu_to_le64(0);
1492                 strcpy(lde->lde_name, ".");
1493                 lde->lde_namelen = cpu_to_le16(1);
1494                 fid_cpu_to_le(&lde->lde_fid,
1495                               lu_object_fid(&it->ozi_obj->oo_dt.do_lu));
1496                 lde->lde_attrs = LUDA_FID;
1497                 /* append lustre attributes */
1498                 osd_it_append_attrs(lde, attr, 1, S_DT(S_IFDIR));
1499                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
1500                 it->ozi_pos = OZI_POS_DOT;
1501                 RETURN(0);
1502         } else if (it->ozi_pos == OZI_POS_DOTDOT) {
1503                 /* same as for . above */
1504                 lde->lde_hash = cpu_to_le64(0);
1505                 strcpy(lde->lde_name, "..");
1506                 lde->lde_namelen = cpu_to_le16(2);
1507                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, fid, NULL);
1508                 if (!rc) {
1509                         fid_cpu_to_le(&lde->lde_fid, fid);
1510                         lde->lde_attrs = LUDA_FID;
1511                 } else if (rc != -ENOENT) {
1512                         /* ENOENT happens at the root of filesystem, ignore */
1513                         RETURN(rc);
1514                 }
1515
1516                 /* append lustre attributes */
1517                 osd_it_append_attrs(lde, attr, 2, S_DT(S_IFDIR));
1518                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
1519                 RETURN(0);
1520         }
1521
1522         LASSERT(lde);
1523
1524         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1525         if (unlikely(rc))
1526                 RETURN(rc);
1527
1528         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
1529         namelen = strlen(za->za_name);
1530         if (namelen > NAME_MAX)
1531                 RETURN(-EOVERFLOW);
1532         strcpy(lde->lde_name, za->za_name);
1533         lde->lde_namelen = cpu_to_le16(namelen);
1534
1535         if (za->za_integer_length != 8) {
1536                 CERROR("%s: unsupported direntry format: %d %d\n",
1537                        osd->od_svname,
1538                        za->za_integer_length, (int)za->za_num_integers);
1539                 RETURN(-EIO);
1540         }
1541
1542         rc = osd_zap_lookup(osd, it->ozi_zc->zc_zapobj, it->ozi_obj->oo_dn,
1543                             za->za_name, za->za_integer_length, 3, zde);
1544         if (rc)
1545                 RETURN(rc);
1546
1547         if (za->za_num_integers >= 3 && fid_is_sane(&zde->lzd_fid)) {
1548                 lde->lde_attrs = LUDA_FID;
1549                 fid_cpu_to_le(&lde->lde_fid, &zde->lzd_fid);
1550                 if (unlikely(zde->lzd_reg.zde_dnode == ZFS_NO_OBJECT &&
1551                              osd_remote_fid(env, osd, &zde->lzd_fid) > 0 &&
1552                              attr & LUDA_VERIFY)) {
1553                         /* It is mainly used for handling the MDT
1554                          * upgraded from old ZFS based backend.
1555                          */
1556                         rc = osd_update_entry_for_agent(env, osd,
1557                                         it->ozi_obj->oo_dn->dn_object,
1558                                         za->za_name, zde, attr);
1559                         if (!rc)
1560                                 lde->lde_attrs |= LUDA_REPAIR;
1561                         else
1562                                 lde->lde_attrs |= LUDA_UNKNOWN;
1563                 }
1564
1565                 if (!(attr & (LUDA_VERIFY | LUDA_VERIFY_DRYRUN)))
1566                         GOTO(pack_attr, rc = 0);
1567         }
1568
1569         if (CFS_FAIL_CHECK(OBD_FAIL_FID_LOOKUP))
1570                 RETURN(-ENOENT);
1571
1572         rc = osd_get_fid_by_oid(env, osd, zde->lzd_reg.zde_dnode, fid);
1573         if (rc) {
1574                 lde->lde_attrs = LUDA_UNKNOWN;
1575                 GOTO(pack_attr, rc = 0);
1576         }
1577
1578         if (za->za_num_integers >= 3 && fid_is_sane(&zde->lzd_fid) &&
1579             lu_fid_eq(&zde->lzd_fid, fid))
1580                 GOTO(pack_attr, rc = 0);
1581
1582         if (!(attr & LUDA_VERIFY)) {
1583                 fid_cpu_to_le(&lde->lde_fid, fid);
1584                 lde->lde_attrs = LUDA_FID;
1585                 GOTO(pack_attr, rc = 0);
1586         }
1587
1588         if (attr & LUDA_VERIFY_DRYRUN) {
1589                 fid_cpu_to_le(&lde->lde_fid, fid);
1590                 lde->lde_attrs = LUDA_FID | LUDA_REPAIR;
1591                 GOTO(pack_attr, rc = 0);
1592         }
1593
1594         fid_cpu_to_le(&lde->lde_fid, fid);
1595         lde->lde_attrs = LUDA_FID;
1596         zde->lzd_fid = *fid;
1597         rc = osd_dirent_update(env, osd, it->ozi_zc->zc_zapobj,
1598                                za->za_name, zde);
1599         if (rc) {
1600                 lde->lde_attrs |= LUDA_UNKNOWN;
1601                 GOTO(pack_attr, rc = 0);
1602         }
1603
1604         lde->lde_attrs |= LUDA_REPAIR;
1605
1606         GOTO(pack_attr, rc = 0);
1607
1608 pack_attr:
1609         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
1610         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
1611         return rc;
1612 }
1613
1614 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1615                                __u32 attr)
1616 {
1617         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1618         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1619         size_t               namelen = 0;
1620         int                  rc;
1621
1622         ENTRY;
1623         if (it->ozi_pos <= OZI_POS_DOT)
1624                 namelen = 1;
1625         else if (it->ozi_pos == OZI_POS_DOTDOT)
1626                 namelen = 2;
1627
1628         if (namelen > 0) {
1629                 rc = lu_dirent_calc_size(namelen, attr);
1630                 RETURN(rc);
1631         }
1632
1633         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1634         if (unlikely(rc != 0))
1635                 RETURN(rc);
1636
1637         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1638                 CERROR("%s: unsupported direntry format: %d %d\n",
1639                        osd_obj2dev(it->ozi_obj)->od_svname,
1640                        za->za_integer_length, (int)za->za_num_integers);
1641                 RETURN(-EIO);
1642         }
1643
1644         namelen = strlen(za->za_name);
1645         if (namelen > NAME_MAX)
1646                 RETURN(-EOVERFLOW);
1647
1648         rc = lu_dirent_calc_size(namelen, attr);
1649
1650         RETURN(rc);
1651 }
1652
1653 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1654 {
1655         struct osd_zap_it *it = (struct osd_zap_it *)di;
1656         __u64              pos;
1657
1658         ENTRY;
1659         if (it->ozi_pos <= OZI_POS_DOTDOT)
1660                 pos = 0;
1661         else
1662                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1663
1664         RETURN(pos);
1665 }
1666
1667 /*
1668  * return status :
1669  *  rc == 0 -> end of directory.
1670  *  rc >  0 -> ok, proceed.
1671  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1672  */
1673 static int osd_dir_it_load(const struct lu_env *env,
1674                         const struct dt_it *di, __u64 hash)
1675 {
1676         struct osd_zap_it *it = (struct osd_zap_it *)di;
1677         struct osd_object *obj = it->ozi_obj;
1678         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1679         int                rc;
1680
1681         ENTRY;
1682         /* reset the cursor */
1683         zap_cursor_fini(it->ozi_zc);
1684         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1685
1686         if (hash == 0) {
1687                 it->ozi_pos = OZI_POS_INIT;
1688                 rc = 1; /* there will be ./.. at least */
1689         } else {
1690                 it->ozi_pos = OZI_POS_REAL;
1691                 /* to return whether the end has been reached */
1692                 rc = osd_index_retrieve_skip_dots(it, za);
1693                 if (rc == 0)
1694                         rc = 1;
1695                 else if (rc == -ENOENT)
1696                         rc = 0;
1697         }
1698
1699         RETURN(rc);
1700 }
1701
1702 const struct dt_index_operations osd_dir_ops = {
1703         .dio_lookup         = osd_dir_lookup,
1704         .dio_declare_insert = osd_declare_dir_insert,
1705         .dio_insert         = osd_dir_insert,
1706         .dio_declare_delete = osd_declare_dir_delete,
1707         .dio_delete         = osd_dir_delete,
1708         .dio_it     = {
1709                 .init     = osd_dir_it_init,
1710                 .fini     = osd_index_it_fini,
1711                 .get      = osd_dir_it_get,
1712                 .put      = osd_dir_it_put,
1713                 .next     = osd_dir_it_next,
1714                 .key      = osd_dir_it_key,
1715                 .key_size = osd_dir_it_key_size,
1716                 .rec      = osd_dir_it_rec,
1717                 .rec_size = osd_dir_it_rec_size,
1718                 .store    = osd_dir_it_store,
1719                 .load     = osd_dir_it_load
1720         }
1721 };
1722
1723 /*
1724  * Primitives for index files using binary keys.
1725  */
1726
1727 /* key integer_size is 8 */
1728 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1729                                   const struct dt_key *src)
1730 {
1731         int size;
1732
1733         LASSERT(dst);
1734         LASSERT(src);
1735
1736         /* align keysize to 64bit */
1737         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1738         size *= sizeof(__u64);
1739
1740         LASSERT(size <= MAXNAMELEN);
1741
1742         if (unlikely(size > o->oo_keysize))
1743                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1744         memcpy(dst, (const char *)src, o->oo_keysize);
1745
1746         return (size/sizeof(__u64));
1747 }
1748
1749 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1750                         struct dt_rec *rec, const struct dt_key *key)
1751 {
1752         struct osd_object *obj = osd_dt_obj(dt);
1753         struct osd_device *osd = osd_obj2dev(obj);
1754         __u64             *k = osd_oti_get(env)->oti_key64;
1755         int                rc;
1756
1757         ENTRY;
1758         rc = osd_prepare_key_uint64(obj, k, key);
1759
1760         rc = -zap_lookup_uint64(osd->od_os, obj->oo_dn->dn_object,
1761                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1762                                 (void *)rec);
1763         RETURN(rc == 0 ? 1 : rc);
1764 }
1765
1766 static int osd_declare_index_insert(const struct lu_env *env,
1767                                     struct dt_object *dt,
1768                                     const struct dt_rec *rec,
1769                                     const struct dt_key *key,
1770                                     struct thandle *th)
1771 {
1772         struct osd_object  *obj = osd_dt_obj(dt);
1773         struct osd_thandle *oh;
1774
1775         ENTRY;
1776         LASSERT(th != NULL);
1777         oh = container_of(th, struct osd_thandle, ot_super);
1778
1779         LASSERT(obj->oo_dn);
1780
1781         /* do not specify the key as then DMU is trying to look it up
1782          * which is very expensive. usually the layers above lookup
1783          * before insertion
1784          */
1785         osd_tx_hold_zap(oh->ot_tx, obj->oo_dn->dn_object, obj->oo_dn,
1786                         TRUE, NULL);
1787
1788         RETURN(0);
1789 }
1790
1791 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1792                             const struct dt_rec *rec, const struct dt_key *key,
1793                             struct thandle *th)
1794 {
1795         struct osd_object  *obj = osd_dt_obj(dt);
1796         struct osd_device  *osd = osd_obj2dev(obj);
1797         struct osd_thandle *oh;
1798         __u64              *k = osd_oti_get(env)->oti_key64;
1799         int                 rc;
1800
1801         ENTRY;
1802         LASSERT(obj->oo_dn);
1803         LASSERT(dt_object_exists(dt));
1804         LASSERT(osd_invariant(obj));
1805         LASSERT(th != NULL);
1806
1807         oh = container_of(th, struct osd_thandle, ot_super);
1808
1809         rc = osd_prepare_key_uint64(obj, k, key);
1810
1811         /* Insert (key,oid) into ZAP */
1812         rc = -zap_add_uint64(osd->od_os, obj->oo_dn->dn_object,
1813                              k, rc, obj->oo_recusize, obj->oo_recsize,
1814                              (void *)rec, oh->ot_tx);
1815         RETURN(rc);
1816 }
1817
1818 static int osd_declare_index_delete(const struct lu_env *env,
1819                                     struct dt_object *dt,
1820                                     const struct dt_key *key,
1821                                     struct thandle *th)
1822 {
1823         struct osd_object  *obj = osd_dt_obj(dt);
1824         struct osd_thandle *oh;
1825
1826         ENTRY;
1827         LASSERT(dt_object_exists(dt));
1828         LASSERT(osd_invariant(obj));
1829         LASSERT(th != NULL);
1830         LASSERT(obj->oo_dn);
1831
1832         oh = container_of(th, struct osd_thandle, ot_super);
1833
1834         /* do not specify the key as then DMU is trying to look it up
1835          * which is very expensive. usually the layers above lookup
1836          * before deletion
1837          */
1838         osd_tx_hold_zap(oh->ot_tx, obj->oo_dn->dn_object, obj->oo_dn,
1839                         FALSE, NULL);
1840
1841         RETURN(0);
1842 }
1843
1844 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1845                             const struct dt_key *key, struct thandle *th)
1846 {
1847         struct osd_object  *obj = osd_dt_obj(dt);
1848         struct osd_device  *osd = osd_obj2dev(obj);
1849         struct osd_thandle *oh;
1850         __u64              *k = osd_oti_get(env)->oti_key64;
1851         int                 rc;
1852
1853         ENTRY;
1854         LASSERT(obj->oo_dn);
1855         LASSERT(th != NULL);
1856         oh = container_of(th, struct osd_thandle, ot_super);
1857
1858         rc = osd_prepare_key_uint64(obj, k, key);
1859
1860         /* Remove binary key from the ZAP */
1861         rc = -zap_remove_uint64(osd->od_os, obj->oo_dn->dn_object,
1862                                 k, rc, oh->ot_tx);
1863         RETURN(rc);
1864 }
1865
1866 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1867                             const struct dt_key *key)
1868 {
1869         struct osd_zap_it *it = (struct osd_zap_it *)di;
1870         struct osd_object *obj = it->ozi_obj;
1871         struct osd_device *osd = osd_obj2dev(obj);
1872
1873         ENTRY;
1874         LASSERT(it);
1875         LASSERT(it->ozi_zc);
1876
1877         /* XXX: we need a binary version of zap_cursor_move_to_key()
1878          *      to implement this API
1879          */
1880         if (*((const __u64 *)key) != 0)
1881                 CERROR("NOT IMPLEMETED YET (move to %#llx)\n", *((__u64 *)key));
1882
1883         zap_cursor_fini(it->ozi_zc);
1884         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_dn->dn_object);
1885         it->ozi_reset = 1;
1886
1887         RETURN(+1);
1888 }
1889
1890 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1891 {
1892         struct osd_zap_it *it = (struct osd_zap_it *)di;
1893         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1894         int                rc;
1895
1896         ENTRY;
1897         if (it->ozi_reset == 0)
1898                 zap_cursor_advance(it->ozi_zc);
1899         it->ozi_reset = 0;
1900
1901         /*
1902          * According to current API we need to return error if it's last entry.
1903          * zap_cursor_advance() does not return any value. So we need to call
1904          * retrieve to check if there is any record.  We should make
1905          * changes to Iterator API to not return status for this API
1906          */
1907         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1908         if (rc == -ENOENT)
1909                 RETURN(+1);
1910
1911         RETURN((rc));
1912 }
1913
1914 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1915                                        const struct dt_it *di)
1916 {
1917         struct osd_zap_it *it = (struct osd_zap_it *)di;
1918         struct osd_object *obj = it->ozi_obj;
1919         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1920         int                rc = 0;
1921
1922         ENTRY;
1923         it->ozi_reset = 0;
1924         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1925         if (rc)
1926                 RETURN(ERR_PTR(rc));
1927
1928         /* the binary key is stored in the name */
1929         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1930
1931         RETURN((struct dt_key *)&it->ozi_key);
1932 }
1933
1934 static int osd_index_it_key_size(const struct lu_env *env,
1935                                 const struct dt_it *di)
1936 {
1937         struct osd_zap_it *it = (struct osd_zap_it *)di;
1938         struct osd_object *obj = it->ozi_obj;
1939
1940         RETURN(obj->oo_keysize);
1941 }
1942
1943 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1944                             struct dt_rec *rec, __u32 attr)
1945 {
1946         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1947         struct osd_zap_it *it = (struct osd_zap_it *)di;
1948         struct osd_object *obj = it->ozi_obj;
1949         struct osd_device *osd = osd_obj2dev(obj);
1950         __u64             *k = osd_oti_get(env)->oti_key64;
1951         int                rc;
1952
1953         ENTRY;
1954         it->ozi_reset = 0;
1955         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1956         if (rc)
1957                 RETURN(rc);
1958
1959         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1960
1961         rc = -zap_lookup_uint64(osd->od_os, obj->oo_dn->dn_object,
1962                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1963                                 (void *)rec);
1964         RETURN(rc);
1965 }
1966
1967 static __u64 osd_index_it_store(const struct lu_env *env,
1968                                 const struct dt_it *di)
1969 {
1970         struct osd_zap_it *it = (struct osd_zap_it *)di;
1971
1972         it->ozi_reset = 0;
1973         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1974 }
1975
1976 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1977                              __u64 hash)
1978 {
1979         struct osd_zap_it *it = (struct osd_zap_it *)di;
1980         struct osd_object *obj = it->ozi_obj;
1981         struct osd_device *osd = osd_obj2dev(obj);
1982         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1983         int                rc;
1984
1985         ENTRY;
1986         /* reset the cursor */
1987         zap_cursor_fini(it->ozi_zc);
1988         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1989                                    obj->oo_dn->dn_object, hash);
1990         it->ozi_reset = 0;
1991
1992         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1993         if (rc == 0)
1994                 RETURN(+1);
1995         else if (rc == -ENOENT)
1996                 RETURN(0);
1997
1998         RETURN(rc);
1999 }
2000
2001 static const struct dt_index_operations osd_index_ops = {
2002         .dio_lookup             = osd_index_lookup,
2003         .dio_declare_insert     = osd_declare_index_insert,
2004         .dio_insert             = osd_index_insert,
2005         .dio_declare_delete     = osd_declare_index_delete,
2006         .dio_delete             = osd_index_delete,
2007         .dio_it = {
2008                 .init           = osd_index_it_init,
2009                 .fini           = osd_index_it_fini,
2010                 .get            = osd_index_it_get,
2011                 .put            = osd_index_it_put,
2012                 .next           = osd_index_it_next,
2013                 .key            = osd_index_it_key,
2014                 .key_size       = osd_index_it_key_size,
2015                 .rec            = osd_index_it_rec,
2016                 .store          = osd_index_it_store,
2017                 .load           = osd_index_it_load
2018         }
2019 };
2020
2021 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2022                 const struct dt_index_features *feat)
2023 {
2024         struct osd_object *obj = osd_dt_obj(dt);
2025         struct osd_device *osd = osd_obj2dev(obj);
2026         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2027         int rc = 0;
2028
2029         ENTRY;
2030         down_read(&obj->oo_guard);
2031
2032         /*
2033          * XXX: implement support for fixed-size keys sorted with natural
2034          *      numerical way (not using internal hash value)
2035          */
2036         if (feat->dif_flags & DT_IND_RANGE)
2037                 GOTO(out, rc = -ERANGE);
2038
2039         if (unlikely(feat == &dt_otable_features)) {
2040                 dt->do_index_ops = &osd_otable_ops;
2041                 GOTO(out, rc = 0);
2042         }
2043
2044         LASSERT(!dt_object_exists(dt) || obj->oo_dn != NULL);
2045         if (likely(feat == &dt_directory_features)) {
2046                 if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_dn))
2047                         dt->do_index_ops = &osd_dir_ops;
2048                 else
2049                         GOTO(out, rc = -ENOTDIR);
2050         } else if (unlikely(feat == &dt_acct_features)) {
2051                 LASSERT(fid_is_acct(fid));
2052                 dt->do_index_ops = &osd_acct_index_ops;
2053         } else if (dt->do_index_ops == NULL) {
2054                 /* For index file, we don't support variable key & record sizes
2055                  * and the key has to be unique
2056                  */
2057                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
2058                         GOTO(out, rc = -EINVAL);
2059
2060                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
2061                         GOTO(out, rc = -E2BIG);
2062                 if (feat->dif_keysize_max != feat->dif_keysize_min)
2063                         GOTO(out, rc = -EINVAL);
2064
2065                 /* As for the record size, it should be a multiple of 8 bytes
2066                  * and smaller than the maximum value length supported by ZAP.
2067                  */
2068                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
2069                         GOTO(out, rc = -E2BIG);
2070                 if (feat->dif_recsize_max != feat->dif_recsize_min)
2071                         GOTO(out, rc = -EINVAL);
2072
2073                 obj->oo_keysize = feat->dif_keysize_max;
2074                 obj->oo_recsize = feat->dif_recsize_max;
2075                 obj->oo_recusize = 1;
2076
2077                 /* ZFS prefers to work with array of 64bits */
2078                 if ((obj->oo_recsize & 7) == 0) {
2079                         obj->oo_recsize >>= 3;
2080                         obj->oo_recusize = 8;
2081                 }
2082                 dt->do_index_ops = &osd_index_ops;
2083
2084                 if (feat == &dt_lfsck_layout_orphan_features ||
2085                     feat == &dt_lfsck_layout_dangling_features ||
2086                     feat == &dt_lfsck_namespace_features)
2087                         GOTO(out, rc = 0);
2088
2089                 rc = osd_index_register(osd, fid, obj->oo_keysize,
2090                                         obj->oo_recusize * obj->oo_recsize);
2091                 if (rc < 0)
2092                         CWARN("%s: failed to register index "DFID": rc = %d\n",
2093                               osd_name(osd), PFID(fid), rc);
2094                 else if (rc > 0)
2095                         rc = 0;
2096                 else
2097                         CDEBUG(D_LFSCK, "%s: index object "DFID
2098                                " (%u/%u/%u) registered\n",
2099                                osd_name(osd), PFID(fid), obj->oo_keysize,
2100                                obj->oo_recusize, obj->oo_recsize);
2101         }
2102
2103 out:
2104         up_read(&obj->oo_guard);
2105
2106         RETURN(rc);
2107 }