Whamcloud - gitweb
LU-13974 llog: check stale osp object
[fs/lustre-release.git] / lustre / osd-zfs / osd_index.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd-zfs/osd_index.c
33  *
34  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
35  * Author: Mike Pershin <tappro@whamcloud.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_OSD
39
40 #include <libcfs/libcfs.h>
41 #include <obd_support.h>
42 #include <lustre_net.h>
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_disk.h>
46 #include <lustre_fid.h>
47
48 #include "osd_internal.h"
49
50 #include <sys/dnode.h>
51 #include <sys/spa.h>
52 #include <sys/stat.h>
53 #include <sys/zap.h>
54 #include <sys/spa_impl.h>
55 #include <sys/zfs_znode.h>
56 #include <sys/dmu_tx.h>
57 #include <sys/dmu_objset.h>
58 #include <sys/dsl_prop.h>
59 #include <sys/sa_impl.h>
60 #include <sys/txg.h>
61 #include <lustre_scrub.h>
62
63 /* We don't actually have direct access to the zap_hashbits() function
64  * so just pretend like we do for now.  If this ever breaks we can look at
65  * it at that time. */
66 #define zap_hashbits(zc) 48
67 /*
68  * ZFS hash format:
69  * | cd (16 bits) | hash (48 bits) |
70  * we need it in other form:
71  * |0| hash (48 bit) | cd (15 bit) |
72  * to be a full 64-bit ordered hash so that Lustre readdir can use it to merge
73  * the readdir hashes from multiple directory stripes uniformly on the client.
74  * Another point is sign bit, the hash range should be in [0, 2^63-1] because
75  * loff_t (for llseek) needs to be a positive value.  This means the "cd" field
76  * should only be the low 15 bits.
77  */
78 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc)
79 {
80         uint64_t zfs_hash = zap_cursor_serialize(zc) & (~0ULL >> 1);
81
82         return (zfs_hash >> zap_hashbits(zc)) |
83                 (zfs_hash << (63 - zap_hashbits(zc)));
84 }
85
86 void osd_zap_cursor_init_serialized(zap_cursor_t *zc, struct objset *os,
87                                     uint64_t id, uint64_t dirhash)
88 {
89         uint64_t zfs_hash = ((dirhash << zap_hashbits(zc)) & (~0ULL >> 1)) |
90                 (dirhash >> (63 - zap_hashbits(zc)));
91
92         zap_cursor_init_serialized(zc, os, id, zfs_hash);
93 }
94
95 int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
96                         uint64_t id, uint64_t dirhash)
97 {
98         zap_cursor_t *t;
99
100         OBD_ALLOC_PTR(t);
101         if (unlikely(t == NULL))
102                 return -ENOMEM;
103
104         osd_zap_cursor_init_serialized(t, os, id, dirhash);
105         *zc = t;
106
107         return 0;
108 }
109
110 void osd_zap_cursor_fini(zap_cursor_t *zc)
111 {
112         zap_cursor_fini(zc);
113         OBD_FREE_PTR(zc);
114 }
115
116 static inline void osd_obj_cursor_init_serialized(zap_cursor_t *zc,
117                                                  struct osd_object *o,
118                                                  uint64_t dirhash)
119 {
120         struct osd_device *d = osd_obj2dev(o);
121         osd_zap_cursor_init_serialized(zc, d->od_os,
122                                        o->oo_dn->dn_object, dirhash);
123 }
124
125 static inline int osd_obj_cursor_init(zap_cursor_t **zc, struct osd_object *o,
126                         uint64_t dirhash)
127 {
128         struct osd_device *d = osd_obj2dev(o);
129         return osd_zap_cursor_init(zc, d->od_os, o->oo_dn->dn_object, dirhash);
130 }
131
132 static struct dt_it *osd_index_it_init(const struct lu_env *env,
133                                        struct dt_object *dt,
134                                        __u32 unused)
135 {
136         struct osd_thread_info  *info = osd_oti_get(env);
137         struct osd_zap_it       *it;
138         struct osd_object       *obj = osd_dt_obj(dt);
139         struct lu_object        *lo  = &dt->do_lu;
140         int                      rc;
141         ENTRY;
142
143         if (obj->oo_destroyed)
144                 RETURN(ERR_PTR(-ENOENT));
145
146         LASSERT(lu_object_exists(lo));
147         LASSERT(obj->oo_dn);
148         LASSERT(info);
149
150         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
151         if (it == NULL)
152                 RETURN(ERR_PTR(-ENOMEM));
153
154         rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
155         if (rc != 0) {
156                 OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
157                 RETURN(ERR_PTR(rc));
158         }
159
160         it->ozi_obj   = obj;
161         it->ozi_reset = 1;
162         lu_object_get(lo);
163
164         RETURN((struct dt_it *)it);
165 }
166
167 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
168 {
169         struct osd_zap_it       *it     = (struct osd_zap_it *)di;
170         struct osd_object       *obj;
171         ENTRY;
172
173         LASSERT(it);
174         LASSERT(it->ozi_obj);
175
176         obj = it->ozi_obj;
177
178         osd_zap_cursor_fini(it->ozi_zc);
179         osd_object_put(env, obj);
180         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
181
182         EXIT;
183 }
184
185
186 static void osd_index_it_put(const struct lu_env *env, struct dt_it *di)
187 {
188         /* PBS: do nothing : ref are incremented at retrive and decreamented
189          *      next/finish. */
190 }
191
192 static inline void osd_it_append_attrs(struct lu_dirent *ent, __u32 attr,
193                                        int len, __u16 type)
194 {
195         const unsigned    align = sizeof(struct luda_type) - 1;
196         struct luda_type *lt;
197
198         /* check if file type is required */
199         if (attr & LUDA_TYPE) {
200                 len = (len + align) & ~align;
201
202                 lt = (void *)ent->lde_name + len;
203                 lt->lt_type = cpu_to_le16(DTTOIF(type));
204                 ent->lde_attrs |= LUDA_TYPE;
205         }
206
207         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
208 }
209
210 int __osd_xattr_load_by_oid(struct osd_device *osd, uint64_t oid, nvlist_t **sa)
211 {
212         sa_handle_t *hdl;
213         dmu_buf_t *db;
214         int rc;
215
216         rc = -dmu_bonus_hold(osd->od_os, oid, osd_obj_tag, &db);
217         if (rc < 0) {
218                 CERROR("%s: can't get bonus, rc = %d\n", osd->od_svname, rc);
219                 return rc;
220         }
221
222         rc = -sa_handle_get_from_db(osd->od_os, db, NULL, SA_HDL_PRIVATE, &hdl);
223         if (rc) {
224                 dmu_buf_rele(db, osd_obj_tag);
225                 return rc;
226         }
227
228         rc = __osd_xattr_load(osd, hdl, sa);
229
230         sa_handle_destroy(hdl);
231
232         return rc;
233 }
234 /**
235  * Get the object's FID from its LMA EA.
236  *
237  * \param[in] env       pointer to the thread context
238  * \param[in] osd       pointer to the OSD device
239  * \param[in] oid       the object's local identifier
240  * \param[out] fid      the buffer to hold the object's FID
241  *
242  * \retval              0 for success
243  * \retval              negative error number on failure
244  */
245 int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
246                        uint64_t oid, struct lu_fid *fid)
247 {
248         struct objset           *os       = osd->od_os;
249         struct osd_thread_info  *oti      = osd_oti_get(env);
250         struct lustre_mdt_attrs *lma      =
251                         (struct lustre_mdt_attrs *)oti->oti_buf;
252         struct lu_buf            buf;
253         nvlist_t                *sa_xattr = NULL;
254         sa_handle_t             *sa_hdl   = NULL;
255         uchar_t                 *nv_value = NULL;
256         uint64_t                 xattr    = ZFS_NO_OBJECT;
257         int                      size     = 0;
258         int                      rc;
259         ENTRY;
260
261         rc = __osd_xattr_load_by_oid(osd, oid, &sa_xattr);
262         if (rc == -ENOENT)
263                 goto regular;
264
265         if (rc != 0)
266                 GOTO(out, rc);
267
268         rc = -nvlist_lookup_byte_array(sa_xattr, XATTR_NAME_LMA, &nv_value,
269                                        &size);
270         if (rc == -ENOENT)
271                 goto regular;
272
273         if (rc != 0)
274                 GOTO(out, rc);
275
276         if (unlikely(size > sizeof(oti->oti_buf)))
277                 GOTO(out, rc = -ERANGE);
278
279         memcpy(lma, nv_value, size);
280
281         goto found;
282
283 regular:
284         rc = -sa_handle_get(os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
285         if (rc != 0)
286                 GOTO(out, rc);
287
288         rc = -sa_lookup(sa_hdl, SA_ZPL_XATTR(osd), &xattr, 8);
289         sa_handle_destroy(sa_hdl);
290         if (rc != 0)
291                 GOTO(out, rc);
292
293         buf.lb_buf = lma;
294         buf.lb_len = sizeof(oti->oti_buf);
295         rc = __osd_xattr_get_large(env, osd, xattr, &buf,
296                                    XATTR_NAME_LMA, &size);
297         if (rc != 0)
298                 GOTO(out, rc);
299
300 found:
301         if (size < sizeof(*lma))
302                 GOTO(out, rc = -EIO);
303
304         lustre_lma_swab(lma);
305         if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
306                      CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
307                 CWARN("%s: unsupported incompat LMA feature(s) %#x for "
308                       "oid = %#llx\n", osd->od_svname,
309                       lma->lma_incompat & ~LMA_INCOMPAT_SUPP, oid);
310                 GOTO(out, rc = -EOPNOTSUPP);
311         } else {
312                 *fid = lma->lma_self_fid;
313                 GOTO(out, rc = 0);
314         }
315
316 out:
317         if (sa_xattr != NULL)
318                 nvlist_free(sa_xattr);
319         return rc;
320 }
321
322 /*
323  * As we don't know FID, we can't use LU object, so this function
324  * partially duplicate osd_xattr_get_internal() which is built around
325  * LU-object and uses it to cache data like regular EA dnode, etc
326  */
327 static int osd_find_parent_by_dnode(const struct lu_env *env,
328                                     struct dt_object *o,
329                                     struct lu_fid *fid, uint64_t *oid)
330 {
331         struct osd_object       *obj = osd_dt_obj(o);
332         struct osd_device       *osd = osd_obj2dev(obj);
333         uint64_t                 dnode = ZFS_NO_OBJECT;
334         int                      rc;
335         ENTRY;
336
337         /* first of all, get parent dnode from own attributes */
338         rc = osd_sa_handle_get(obj);
339         if (rc != 0)
340                 RETURN(rc);
341         rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PARENT(osd), &dnode, 8);
342         if (!rc) {
343                 if (oid)
344                         *oid = dnode;
345                 rc = osd_get_fid_by_oid(env, osd, dnode, fid);
346         }
347
348         RETURN(rc);
349 }
350
351 static int osd_find_parent_fid(const struct lu_env *env, struct dt_object *o,
352                                struct lu_fid *fid, uint64_t *oid)
353 {
354         struct link_ea_header  *leh;
355         struct link_ea_entry   *lee;
356         struct lu_buf           buf;
357         int                     rc;
358         ENTRY;
359
360         buf.lb_buf = osd_oti_get(env)->oti_buf;
361         buf.lb_len = sizeof(osd_oti_get(env)->oti_buf);
362
363         rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
364         if (rc == -ERANGE) {
365                 rc = osd_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LINK);
366                 if (rc < 0)
367                         RETURN(rc);
368                 LASSERT(rc > 0);
369                 OBD_ALLOC(buf.lb_buf, rc);
370                 if (buf.lb_buf == NULL)
371                         RETURN(-ENOMEM);
372                 buf.lb_len = rc;
373                 rc = osd_xattr_get(env, o, &buf, XATTR_NAME_LINK);
374         }
375         if (rc < 0)
376                 GOTO(out, rc);
377         if (rc < sizeof(*leh) + sizeof(*lee))
378                 GOTO(out, rc = -EINVAL);
379
380         leh = buf.lb_buf;
381         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
382                 leh->leh_magic = LINK_EA_MAGIC;
383                 leh->leh_reccount = __swab32(leh->leh_reccount);
384                 leh->leh_len = __swab64(leh->leh_len);
385         }
386         if (leh->leh_magic != LINK_EA_MAGIC)
387                 GOTO(out, rc = -EINVAL);
388         if (leh->leh_reccount == 0)
389                 GOTO(out, rc = -ENODATA);
390
391         lee = (struct link_ea_entry *)(leh + 1);
392         fid_be_to_cpu(fid, (const struct lu_fid *)&lee->lee_parent_fid);
393         rc = 0;
394
395 out:
396         if (buf.lb_buf != osd_oti_get(env)->oti_buf)
397                 OBD_FREE(buf.lb_buf, buf.lb_len);
398
399 #if 0
400         /* this block can be enabled for additional verification
401          * it's trying to match FID from LinkEA vs. FID from LMA */
402         if (rc == 0) {
403                 struct lu_fid fid2;
404                 int rc2;
405                 rc2 = osd_find_parent_by_dnode(env, o, &fid2, oid);
406                 if (rc2 == 0)
407                         if (lu_fid_eq(fid, &fid2) == 0)
408                                 CERROR("wrong parent: "DFID" != "DFID"\n",
409                                        PFID(fid), PFID(&fid2));
410         }
411 #endif
412
413         /* no LinkEA is found, let's try to find the fid in parent's LMA */
414         if (unlikely(rc != 0))
415                 rc = osd_find_parent_by_dnode(env, o, fid, oid);
416
417         RETURN(rc);
418 }
419
420 /*
421  * When lookup item under striped directory, we need to locate the master
422  * MDT-object of the striped directory firstly, then the client will send
423  * lookup (getattr_by_name) RPC to the MDT with some slave MDT-object's FID
424  * and the item's name. If the system is restored from MDT file level backup,
425  * then before the OI scrub completely built the OI files, the OI mappings of
426  * the master MDT-object and slave MDT-object may be invalid. Usually, it is
427  * not a problem for the master MDT-object. Because when locate the master
428  * MDT-object, we will do name based lookup (for the striped directory itself)
429  * firstly, during such process we can setup the correct OI mapping for the
430  * master MDT-object. But it will be trouble for the slave MDT-object. Because
431  * the client will not trigger name based lookup on the MDT to locate the slave
432  * MDT-object before locating item under the striped directory, then when
433  * osd_fid_lookup(), it will find that the OI mapping for the slave MDT-object
434  * is invalid and does not know what the right OI mapping is, then the MDT has
435  * to return -EINPROGRESS to the client to notify that the OI scrub is rebuiding
436  * the OI file, related OI mapping is unknown yet, please try again later. And
437  * then client will re-try the RPC again and again until related OI mapping has
438  * been updated. That is quite inefficient.
439  *
440  * To resolve above trouble, we will handle it as the following two cases:
441  *
442  * 1) The slave MDT-object and the master MDT-object are on different MDTs.
443  *    It is relative easy. Be as one of remote MDT-objects, the slave MDT-object
444  *    is linked under /REMOTE_PARENT_DIR with the name of its FID string.
445  *    We can locate the slave MDT-object via lookup the /REMOTE_PARENT_DIR
446  *    directly. Please check osd_fid_lookup().
447  *
448  * 2) The slave MDT-object and the master MDT-object reside on the same MDT.
449  *    Under such case, during lookup the master MDT-object, we will lookup the
450  *    slave MDT-object via readdir against the master MDT-object, because the
451  *    slave MDT-objects information are stored as sub-directories with the name
452  *    "${FID}:${index}". Then when find the local slave MDT-object, its OI
453  *    mapping will be recorded. Then subsequent osd_fid_lookup() will know
454  *    the correct OI mapping for the slave MDT-object.
455  */
456 static int osd_check_lmv(const struct lu_env *env, struct osd_device *osd,
457                          uint64_t oid, const struct lu_fid *fid)
458 {
459         struct osd_thread_info *info = osd_oti_get(env);
460         struct luz_direntry *zde = &info->oti_zde;
461         zap_attribute_t *za = &info->oti_za;
462         zap_cursor_t *zc = &info->oti_zc;
463         struct lu_fid *tfid = &info->oti_fid;
464         nvlist_t *nvbuf = NULL;
465         struct lmv_mds_md_v1 *lmv = NULL;
466         int size;
467         int rc;
468         ENTRY;
469
470         rc = __osd_xattr_load_by_oid(osd, oid, &nvbuf);
471         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
472                 RETURN(0);
473
474         if (rc)
475                 RETURN(rc);
476
477         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMV,
478                                        (uchar_t **)&lmv, &size);
479         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
480                 GOTO(out_nvbuf, rc = 0);
481
482         if (rc)
483                 GOTO(out_nvbuf, rc);
484
485         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
486                 GOTO(out_nvbuf, rc = -EINVAL);
487
488         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
489         rc = -zap_cursor_retrieve(zc, za);
490         if (rc == -ENOENT) {
491                 zap_cursor_advance(zc);
492         } else if (rc) {
493                 CERROR("%s: fail to init for check LMV "DFID"(%llu): rc = %d\n",
494                        osd_name(osd), PFID(fid), oid, rc);
495                 GOTO(out_zc, rc);
496         }
497
498         while (1) {
499                 rc = -zap_cursor_retrieve(zc, za);
500                 if (rc == -ENOENT)
501                         GOTO(out_zc, rc = 0);
502
503                 if (rc) {
504                         CERROR("%s: fail to locate next for check LMV "
505                                DFID"(%llu): rc = %d\n",
506                                osd_name(osd), PFID(fid), oid, rc);
507                         GOTO(out_zc, rc);
508                 }
509
510                 fid_zero(tfid);
511                 sscanf(za->za_name + 1, SFID, RFID(tfid));
512                 if (fid_is_sane(tfid) && !osd_remote_fid(env, osd, tfid)) {
513                         rc = osd_zap_lookup(osd, oid, NULL, za->za_name,
514                                         za->za_integer_length,
515                                         sizeof(*zde) / za->za_integer_length,
516                                         (void *)zde);
517                         if (rc) {
518                                 CERROR("%s: fail to lookup for check LMV "
519                                        DFID"(%llu): rc = %d\n",
520                                        osd_name(osd), PFID(fid), oid, rc);
521                                 GOTO(out_zc, rc);
522                         }
523
524                         rc = osd_oii_insert(env, osd, tfid,
525                                             zde->lzd_reg.zde_dnode, false);
526                         GOTO(out_zc, rc);
527                 }
528
529                 zap_cursor_advance(zc);
530         }
531
532 out_zc:
533         zap_cursor_fini(zc);
534 out_nvbuf:
535         nvlist_free(nvbuf);
536
537         return rc;
538 }
539
540 static int
541 osd_consistency_check(const struct lu_env *env, struct osd_device *osd,
542                       struct osd_object *obj, const struct lu_fid *fid,
543                       uint64_t oid, bool is_dir)
544 {
545         struct lustre_scrub *scrub = &osd->od_scrub;
546         dnode_t *dn = NULL;
547         uint64_t oid2;
548         int once = 0;
549         bool insert;
550         int rc;
551         ENTRY;
552
553         if (!fid_is_norm(fid) && !fid_is_igif(fid))
554                 RETURN(0);
555
556         /* oid == ZFS_NO_OBJECT must be for lookup ".." case */
557         if (oid == ZFS_NO_OBJECT) {
558                 rc = osd_sa_handle_get(obj);
559                 if (rc)
560                         RETURN(rc);
561
562                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PARENT(osd), &oid, 8);
563                 if (rc)
564                         RETURN(rc);
565         }
566
567         if (scrub->os_running) {
568                 if (scrub->os_pos_current > oid)
569                         RETURN(0);
570         } else if (osd->od_auto_scrub_interval == AS_NEVER) {
571                 RETURN(0);
572         } else {
573                 if (ktime_get_real_seconds() <
574                     scrub->os_file.sf_time_last_complete +
575                     osd->od_auto_scrub_interval)
576                         RETURN(0);
577         }
578
579 again:
580         rc = osd_fid_lookup(env, osd, fid, &oid2);
581         if (rc == -ENOENT) {
582                 insert = true;
583                 if (dn)
584                         goto trigger;
585
586                 rc = __osd_obj2dnode(osd->od_os, oid, &dn);
587                 /* The object has been removed (by race maybe). */
588                 if (rc)
589                         RETURN(rc = (rc == -EEXIST ? -ENOENT : rc));
590
591                 goto trigger;
592         } else if (rc || oid == oid2) {
593                 GOTO(out, rc);
594         }
595
596         insert = false;
597
598 trigger:
599         if (scrub->os_running) {
600                 if (!dn) {
601                         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
602                         /* The object has been removed (by race maybe). */
603                         if (rc)
604                                 RETURN(rc = (rc == -EEXIST ? -ENOENT : rc));
605                 }
606
607                 rc = osd_oii_insert(env, osd, fid, oid, insert);
608                 /* There is race condition between osd_oi_lookup and OI scrub.
609                  * The OI scrub finished just after osd_oi_lookup() failure.
610                  * Under such case, it is unnecessary to trigger OI scrub again,
611                  * but try to call osd_oi_lookup() again. */
612                 if (unlikely(rc == -EAGAIN))
613                         goto again;
614
615                 if (is_dir)
616                         rc = osd_check_lmv(env, osd, oid, fid);
617                 else
618                         rc = 0;
619
620                 GOTO(out, rc);
621         }
622
623         if (osd->od_auto_scrub_interval != AS_NEVER && ++once == 1) {
624                 rc = osd_scrub_start(env, osd, SS_AUTO_FULL |
625                                      SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT);
626                 CDEBUG(D_LFSCK | D_CONSOLE | D_WARNING,
627                        "%s: trigger partial OI scrub for RPC inconsistency "
628                        "checking FID "DFID": rc = %d\n",
629                        osd_name(osd), PFID(fid), rc);
630                 if (!rc)
631                         goto again;
632         }
633
634         GOTO(out, rc);
635
636 out:
637         if (dn)
638                 osd_dnode_rele(dn);
639
640         return rc;
641 }
642
643 static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt,
644                           struct dt_rec *rec, const struct dt_key *key)
645 {
646         struct osd_thread_info *oti = osd_oti_get(env);
647         struct osd_object *obj = osd_dt_obj(dt);
648         struct osd_device *osd = osd_obj2dev(obj);
649         struct lu_fid *fid = (struct lu_fid *)rec;
650         char *name = (char *)key;
651         uint64_t oid = ZFS_NO_OBJECT;
652         int rc;
653         ENTRY;
654
655         if (name[0] == '.') {
656                 if (name[1] == 0) {
657                         const struct lu_fid *f = lu_object_fid(&dt->do_lu);
658                         memcpy(rec, f, sizeof(*f));
659                         RETURN(1);
660                 } else if (name[1] == '.' && name[2] == 0) {
661                         rc = osd_find_parent_fid(env, dt, fid, &oid);
662                         GOTO(out, rc);
663                 }
664         }
665
666         memset(&oti->oti_zde.lzd_fid, 0, sizeof(struct lu_fid));
667         rc = osd_zap_lookup(osd, obj->oo_dn->dn_object, obj->oo_dn,
668                             (char *)key, 8, sizeof(oti->oti_zde) / 8,
669                             (void *)&oti->oti_zde);
670         if (rc != 0)
671                 RETURN(rc);
672
673         oid = oti->oti_zde.lzd_reg.zde_dnode;
674         if (likely(fid_is_sane(&oti->oti_zde.lzd_fid))) {
675                 memcpy(rec, &oti->oti_zde.lzd_fid, sizeof(struct lu_fid));
676                 GOTO(out, rc = 0);
677         }
678
679         rc = osd_get_fid_by_oid(env, osd, oti->oti_zde.lzd_reg.zde_dnode, fid);
680
681         GOTO(out, rc);
682
683 out:
684         if (!rc && !osd_remote_fid(env, osd, fid)) {
685                 /*
686                  * this should ask the scrubber to check OI given
687                  * the mapping we just found in the dir entry.
688                  * but result of that check should not affect
689                  * result of the lookup in the directory.
690                  * otherwise such a direntry becomes hidden
691                  * from the layers above, including LFSCK which
692                  * is supposed to fix dangling entries.
693                  */
694                 osd_consistency_check(env, osd, obj, fid, oid,
695                                 S_ISDIR(DTTOIF(oti->oti_zde.lzd_reg.zde_type)));
696         }
697
698         return rc == 0 ? 1 : (rc == -ENOENT ? -ENODATA : rc);
699 }
700
701 /*
702  * In DNE environment, the object and its name entry may reside on different
703  * MDTs. Under such case, we will create an agent object on the MDT where the
704  * name entry resides. The agent object is empty, and indicates that the real
705  * object for the name entry resides on another MDT. If without agent object,
706  * related name entry will be skipped when perform MDT side file level backup
707  * and restore via ZPL by userspace tool, such as 'tar'.
708  */
709 static int osd_create_agent_object(const struct lu_env *env,
710                                    struct osd_device *osd,
711                                    struct luz_direntry *zde,
712                                    uint64_t parent, dmu_tx_t *tx)
713 {
714         struct osd_thread_info *info = osd_oti_get(env);
715         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
716         struct lu_attr *la = &info->oti_la;
717         nvlist_t *nvbuf = NULL;
718         dnode_t *dn = NULL;
719         sa_handle_t *hdl;
720         int rc = 0;
721         ENTRY;
722
723         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTOBJ))
724                 RETURN(0);
725
726         rc = -nvlist_alloc(&nvbuf, NV_UNIQUE_NAME, KM_SLEEP);
727         if (rc)
728                 RETURN(rc);
729
730         lustre_lma_init(lma, &zde->lzd_fid, 0, LMAI_AGENT);
731         lustre_lma_swab(lma);
732         rc = -nvlist_add_byte_array(nvbuf, XATTR_NAME_LMA, (uchar_t *)lma,
733                                     sizeof(*lma));
734         if (rc)
735                 GOTO(out, rc);
736
737         la->la_valid = LA_TYPE | LA_MODE;
738         la->la_mode = (DTTOIF(zde->lzd_reg.zde_type) & S_IFMT) |
739                         S_IRUGO | S_IWUSR | S_IXUGO;
740
741         if (S_ISDIR(la->la_mode))
742                 rc = __osd_zap_create(env, osd, &dn, tx, la,
743                                 osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS), 0);
744         else
745                 rc = __osd_object_create(env, osd, NULL, &zde->lzd_fid,
746                                          &dn, tx, la);
747         if (rc)
748                 GOTO(out, rc);
749
750         zde->lzd_reg.zde_dnode = dn->dn_object;
751         rc = -sa_handle_get(osd->od_os, dn->dn_object, NULL,
752                             SA_HDL_PRIVATE, &hdl);
753         if (!rc) {
754                 rc = __osd_attr_init(env, osd, NULL, hdl, tx,
755                                      la, parent, nvbuf);
756                 sa_handle_destroy(hdl);
757         }
758
759         GOTO(out, rc);
760
761 out:
762         if (dn) {
763                 if (rc)
764                         dmu_object_free(osd->od_os, dn->dn_object, tx);
765                 osd_dnode_rele(dn);
766         }
767
768         if (nvbuf)
769                 nvlist_free(nvbuf);
770
771         return rc;
772 }
773
774 int osd_add_to_remote_parent(const struct lu_env *env,
775                              struct osd_device *osd,
776                              struct osd_object *obj,
777                              struct osd_thandle *oh)
778 {
779         struct osd_thread_info *info = osd_oti_get(env);
780         struct luz_direntry *zde = &info->oti_zde;
781         char *name = info->oti_str;
782         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
783         struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
784         struct lu_buf buf = {
785                 .lb_buf = lma,
786                 .lb_len = sizeof(info->oti_buf),
787         };
788         int size = 0;
789         int rc;
790         ENTRY;
791
792         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AGENTENT))
793                 RETURN(0);
794
795         rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
796         if (rc) {
797                 CWARN("%s: fail to load LMA for adding "
798                       DFID" to remote parent: rc = %d\n",
799                       osd_name(osd), PFID(fid), rc);
800                 RETURN(rc);
801         }
802
803         lustre_lma_swab(lma);
804         lma->lma_incompat |= LMAI_REMOTE_PARENT;
805         lustre_lma_swab(lma);
806         buf.lb_len = size;
807         rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
808                                     LU_XATTR_REPLACE, oh);
809         if (rc) {
810                 CWARN("%s: fail to update LMA for adding "
811                       DFID" to remote parent: rc = %d\n",
812                       osd_name(osd), PFID(fid), rc);
813                 RETURN(rc);
814         }
815
816         osd_fid2str(name, fid, sizeof(info->oti_str));
817         zde->lzd_reg.zde_dnode = obj->oo_dn->dn_object;
818         zde->lzd_reg.zde_type = IFTODT(S_IFDIR);
819         zde->lzd_fid = *fid;
820
821         rc = osd_zap_add(osd, osd->od_remote_parent_dir, NULL,
822                          name, 8, sizeof(*zde) / 8, zde, oh->ot_tx);
823         if (unlikely(rc == -EEXIST))
824                 rc = 0;
825         if (rc)
826                 CWARN("%s: fail to add name entry for "
827                       DFID" to remote parent: rc = %d\n",
828                       osd_name(osd), PFID(fid), rc);
829         else
830                 lu_object_set_agent_entry(&obj->oo_dt.do_lu);
831
832         RETURN(rc);
833 }
834
835 int osd_delete_from_remote_parent(const struct lu_env *env,
836                                   struct osd_device *osd,
837                                   struct osd_object *obj,
838                                   struct osd_thandle *oh, bool destroy)
839 {
840         struct osd_thread_info *info = osd_oti_get(env);
841         char *name = info->oti_str;
842         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
843         struct lustre_mdt_attrs *lma = (struct lustre_mdt_attrs *)info->oti_buf;
844         struct lu_buf buf = {
845                 .lb_buf = lma,
846                 .lb_len = sizeof(info->oti_buf),
847         };
848         int size = 0;
849         int rc;
850         ENTRY;
851
852         osd_fid2str(name, fid, sizeof(info->oti_str));
853         rc = osd_zap_remove(osd, osd->od_remote_parent_dir, NULL,
854                             name, oh->ot_tx);
855         if (unlikely(rc == -ENOENT))
856                 rc = 0;
857         if (rc)
858                 CERROR("%s: fail to remove entry under remote "
859                        "parent for "DFID": rc = %d\n",
860                        osd_name(osd), PFID(fid), rc);
861
862         if (destroy || rc)
863                 RETURN(rc);
864
865         rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
866         if (rc) {
867                 CERROR("%s: fail to load LMA for removing "
868                        DFID" from remote parent: rc = %d\n",
869                        osd_name(osd), PFID(fid), rc);
870                 RETURN(rc);
871         }
872
873         lustre_lma_swab(lma);
874         lma->lma_incompat &= ~LMAI_REMOTE_PARENT;
875         lustre_lma_swab(lma);
876         buf.lb_len = size;
877         rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
878                                     LU_XATTR_REPLACE, oh);
879         if (rc)
880                 CERROR("%s: fail to update LMA for removing "
881                        DFID" from remote parent: rc = %d\n",
882                        osd_name(osd), PFID(fid), rc);
883         else
884                 lu_object_clear_agent_entry(&obj->oo_dt.do_lu);
885
886         RETURN(rc);
887 }
888
889 static int osd_declare_dir_insert(const struct lu_env *env,
890                                   struct dt_object *dt,
891                                   const struct dt_rec *rec,
892                                   const struct dt_key *key,
893                                   struct thandle *th)
894 {
895         struct osd_object       *obj = osd_dt_obj(dt);
896         struct osd_device       *osd = osd_obj2dev(obj);
897         const struct dt_insert_rec *rec1;
898         const struct lu_fid     *fid;
899         struct osd_thandle      *oh;
900         uint64_t                 object;
901         struct osd_idmap_cache *idc;
902         ENTRY;
903
904         rec1 = (struct dt_insert_rec *)rec;
905         fid = rec1->rec_fid;
906         LASSERT(fid != NULL);
907         LASSERT(rec1->rec_type != 0);
908
909         LASSERT(th != NULL);
910         oh = container_of(th, struct osd_thandle, ot_super);
911
912         idc = osd_idc_find_or_init(env, osd, fid);
913         if (IS_ERR(idc))
914                 RETURN(PTR_ERR(idc));
915
916         if (idc->oic_remote) {
917                 const char *name = (const char *)key;
918
919                 if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
920                         /* Prepare agent object for remote entry that will
921                          * be used for operations via ZPL, such as MDT side
922                          * file-level backup and restore. */
923                         dmu_tx_hold_sa_create(oh->ot_tx,
924                                 osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
925                         if (S_ISDIR(rec1->rec_type))
926                                 dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT,
927                                                 FALSE, NULL);
928                 }
929         }
930
931         /* This is for inserting dot/dotdot for new created dir. */
932         if (obj->oo_dn == NULL)
933                 object = DMU_NEW_OBJECT;
934         else
935                 object = obj->oo_dn->dn_object;
936
937         /* do not specify the key as then DMU is trying to look it up
938          * which is very expensive. usually the layers above lookup
939          * before insertion */
940         osd_tx_hold_zap(oh->ot_tx, object, obj->oo_dn, TRUE, NULL);
941
942         RETURN(0);
943 }
944
945 static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
946                           u64 seq)
947 {
948         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
949         struct seq_server_site  *ss = osd_seq_site(osd);
950         int                     rc;
951         ENTRY;
952
953         LASSERT(ss != NULL);
954         LASSERT(ss->ss_server_fld != NULL);
955
956         rc = osd_fld_lookup(env, osd, seq, range);
957         if (rc != 0) {
958                 if (rc != -ENOENT)
959                         CERROR("%s: Can not lookup fld for %#llx\n",
960                                osd_name(osd), seq);
961                 RETURN(0);
962         }
963
964         RETURN(ss->ss_node_id == range->lsr_index);
965 }
966
967 int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
968                    const struct lu_fid *fid)
969 {
970         struct seq_server_site  *ss = osd_seq_site(osd);
971         ENTRY;
972
973         /* FID seqs not in FLDB, must be local seq */
974         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
975                 RETURN(0);
976
977         /* If FLD is not being initialized yet, it only happens during the
978          * initialization, likely during mgs initialization, and we assume
979          * this is local FID. */
980         if (ss == NULL || ss->ss_server_fld == NULL)
981                 RETURN(0);
982
983         /* Only check the local FLDB here */
984         if (osd_seq_exists(env, osd, fid_seq(fid)))
985                 RETURN(0);
986
987         RETURN(1);
988 }
989
990 /**
991  *      Inserts (key, value) pair in \a directory object.
992  *
993  *      \param  dt      osd index object
994  *      \param  key     key for index
995  *      \param  rec     record reference
996  *      \param  th      transaction handler
997  *
998  *      \retval  0  success
999  *      \retval -ve failure
1000  */
1001 static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
1002                           const struct dt_rec *rec, const struct dt_key *key,
1003                           struct thandle *th)
1004 {
1005         struct osd_thread_info *oti = osd_oti_get(env);
1006         struct osd_object   *parent = osd_dt_obj(dt);
1007         struct osd_device   *osd = osd_obj2dev(parent);
1008         struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
1009         const struct lu_fid *fid = rec1->rec_fid;
1010         struct osd_thandle *oh;
1011         struct osd_idmap_cache *idc;
1012         const char *name = (const char *)key;
1013         struct luz_direntry *zde = &oti->oti_zde;
1014         int num = sizeof(*zde) / 8;
1015         int rc;
1016         ENTRY;
1017
1018         LASSERT(parent->oo_dn);
1019
1020         LASSERT(dt_object_exists(dt));
1021         LASSERT(osd_invariant(parent));
1022
1023         LASSERT(th != NULL);
1024         oh = container_of(th, struct osd_thandle, ot_super);
1025
1026         idc = osd_idc_find(env, osd, fid);
1027         if (unlikely(idc == NULL)) {
1028                 /* this dt_insert() wasn't declared properly, so
1029                  * FID is missing in OI cache. we better do not
1030                  * lookup FID in FLDB/OI and don't risk to deadlock,
1031                  * but in some special cases (lfsck testing, etc)
1032                  * it's much simpler than fixing a caller */
1033                 idc = osd_idc_find_or_init(env, osd, fid);
1034                 if (IS_ERR(idc)) {
1035                         CERROR("%s: "DFID" wasn't declared for insert\n",
1036                                osd_name(osd), PFID(fid));
1037                         RETURN(PTR_ERR(idc));
1038                 }
1039         }
1040
1041         BUILD_BUG_ON(sizeof(zde->lzd_reg) != 8);
1042         BUILD_BUG_ON(sizeof(*zde) % 8 != 0);
1043
1044         memset(&zde->lzd_reg, 0, sizeof(zde->lzd_reg));
1045         zde->lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
1046         zde->lzd_fid = *fid;
1047
1048         if (idc->oic_remote) {
1049                 if (name[0] != '.' || name[1] != '.' || name[2] != 0) {
1050                         /* Create agent inode for remote object that will
1051                          * be used for MDT file-level backup and restore. */
1052                         rc = osd_create_agent_object(env, osd, zde,
1053                                         parent->oo_dn->dn_object, oh->ot_tx);
1054                         if (rc) {
1055                                 CWARN("%s: Fail to create agent object for "
1056                                       DFID": rc = %d\n",
1057                                       osd_name(osd), PFID(fid), rc);
1058                                 /* Ignore the failure since the system can go
1059                                  * ahead if we do not care about the MDT side
1060                                  * file-level backup and restore. */
1061                                 rc = 0;
1062                         }
1063                 }
1064         } else {
1065                 if (unlikely(idc->oic_dnode == 0)) {
1066                         /* for a reason OI cache wasn't filled properly */
1067                         CERROR("%s: OIC for "DFID" isn't filled\n",
1068                                osd_name(osd), PFID(fid));
1069                         RETURN(-EINVAL);
1070                 }
1071                 if (name[0] == '.') {
1072                         if (name[1] == 0) {
1073                                 /* do not store ".", instead generate it
1074                                  * during iteration */
1075                                 GOTO(out, rc = 0);
1076                         } else if (name[1] == '.' && name[2] == 0) {
1077                                 uint64_t dnode = idc->oic_dnode;
1078                                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT))
1079                                         dnode--;
1080
1081                                 /* update parent dnode in the child.
1082                                  * later it will be used to generate ".." */
1083                                 rc = osd_object_sa_update(parent,
1084                                                  SA_ZPL_PARENT(osd),
1085                                                  &dnode, 8, oh);
1086
1087                                 GOTO(out, rc);
1088                         }
1089                 }
1090                 zde->lzd_reg.zde_dnode = idc->oic_dnode;
1091         }
1092
1093         if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR))
1094                 zde->lzd_fid.f_ver = ~0;
1095
1096         /* The logic is not related with IGIF, just re-use the fail_loc value
1097          * to be consistent with ldiskfs case, then share the same test logic */
1098         if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
1099                 num = 1;
1100
1101         /* Insert (key,oid) into ZAP */
1102         rc = osd_zap_add(osd, parent->oo_dn->dn_object, parent->oo_dn,
1103                          name, 8, num, (void *)zde, oh->ot_tx);
1104         if (unlikely(rc == -EEXIST &&
1105                      name[0] == '.' && name[1] == '.' && name[2] == 0))
1106                 /* Update (key,oid) in ZAP */
1107                 rc = -zap_update(osd->od_os, parent->oo_dn->dn_object, name, 8,
1108                                  sizeof(*zde) / 8, (void *)zde, oh->ot_tx);
1109
1110 out:
1111
1112         RETURN(rc);
1113 }
1114
1115 static int osd_declare_dir_delete(const struct lu_env *env,
1116                                   struct dt_object *dt,
1117                                   const struct dt_key *key,
1118                                   struct thandle *th)
1119 {
1120         struct osd_object *obj = osd_dt_obj(dt);
1121         dnode_t *zap_dn = obj->oo_dn;
1122         struct osd_thandle *oh;
1123         const char *name = (const char *)key;
1124         ENTRY;
1125
1126         LASSERT(dt_object_exists(dt));
1127         LASSERT(osd_invariant(obj));
1128         LASSERT(zap_dn != NULL);
1129
1130         LASSERT(th != NULL);
1131         oh = container_of(th, struct osd_thandle, ot_super);
1132
1133         /*
1134          * In Orion . and .. were stored in the directory (not generated upon
1135          * request as now). We preserve them for backward compatibility.
1136          */
1137         if (name[0] == '.') {
1138                 if (name[1] == 0)
1139                         RETURN(0);
1140                 else if (name[1] == '.' && name[2] == 0)
1141                         RETURN(0);
1142         }
1143
1144         /* do not specify the key as then DMU is trying to look it up
1145          * which is very expensive. usually the layers above lookup
1146          * before deletion */
1147         osd_tx_hold_zap(oh->ot_tx, zap_dn->dn_object, zap_dn, FALSE, NULL);
1148
1149         /* For destroying agent object if have. */
1150         dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
1151
1152         RETURN(0);
1153 }
1154
1155 static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt,
1156                           const struct dt_key *key, struct thandle *th)
1157 {
1158         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1159         struct osd_object *obj = osd_dt_obj(dt);
1160         struct osd_device *osd = osd_obj2dev(obj);
1161         struct osd_thandle *oh;
1162         dnode_t *zap_dn = obj->oo_dn;
1163         char      *name = (char *)key;
1164         int rc;
1165         ENTRY;
1166
1167         LASSERT(zap_dn);
1168
1169         LASSERT(th != NULL);
1170         oh = container_of(th, struct osd_thandle, ot_super);
1171
1172         /*
1173          * In Orion . and .. were stored in the directory (not generated upon
1174          * request as now). we preserve them for backward compatibility
1175          */
1176         if (name[0] == '.') {
1177                 if (name[1] == 0) {
1178                         RETURN(0);
1179                 } else if (name[1] == '.' && name[2] == 0) {
1180                         RETURN(0);
1181                 }
1182         }
1183
1184         /* XXX: We have to say that lookup during delete_declare will affect
1185          *      performance, but we have to check whether the name entry (to
1186          *      be deleted) has agent object or not to avoid orphans.
1187          *
1188          *      We will improve that in the future, some possible solutions,
1189          *      for example:
1190          *      1) Some hint from the caller via transaction handle to make
1191          *         the lookup conditionally.
1192          *      2) Enhance the ZFS logic to recognize the OSD lookup result
1193          *         and delete the given entry directly without lookup again
1194          *         internally. LU-10190 */
1195         memset(&zde->lzd_fid, 0, sizeof(zde->lzd_fid));
1196         rc = osd_zap_lookup(osd, zap_dn->dn_object, zap_dn, name, 8, 3, zde);
1197         if (unlikely(rc)) {
1198                 if (rc != -ENOENT)
1199                         CERROR("%s: failed to locate entry  %s: rc = %d\n",
1200                                osd->od_svname, name, rc);
1201                 RETURN(rc);
1202         }
1203
1204         if (unlikely(osd_remote_fid(env, osd, &zde->lzd_fid) > 0)) {
1205                 rc = -dmu_object_free(osd->od_os, zde->lzd_reg.zde_dnode,
1206                                       oh->ot_tx);
1207                 if (rc)
1208                         CERROR("%s: failed to destroy agent object (%llu) "
1209                                "for the entry %s: rc = %d\n", osd->od_svname,
1210                                (__u64)zde->lzd_reg.zde_dnode, name, rc);
1211         }
1212
1213         /* Remove key from the ZAP */
1214         rc = osd_zap_remove(osd, zap_dn->dn_object, zap_dn,
1215                             (char *)key, oh->ot_tx);
1216         if (unlikely(rc))
1217                 CERROR("%s: zap_remove %s failed: rc = %d\n",
1218                        osd->od_svname, name, rc);
1219
1220         RETURN(rc);
1221 }
1222
1223 static struct dt_it *osd_dir_it_init(const struct lu_env *env,
1224                                      struct dt_object *dt,
1225                                      __u32 unused)
1226 {
1227         struct osd_zap_it *it;
1228
1229         it = (struct osd_zap_it *)osd_index_it_init(env, dt, unused);
1230         if (!IS_ERR(it))
1231                 it->ozi_pos = OZI_POS_INIT;
1232
1233         RETURN((struct dt_it *)it);
1234 }
1235
1236 /**
1237  *  Move Iterator to record specified by \a key
1238  *
1239  *  \param  di      osd iterator
1240  *  \param  key     key for index
1241  *
1242  *  \retval +ve  di points to record with least key not larger than key
1243  *  \retval  0   di points to exact matched key
1244  *  \retval -ve  failure
1245  */
1246 static int osd_dir_it_get(const struct lu_env *env,
1247                           struct dt_it *di, const struct dt_key *key)
1248 {
1249         struct osd_zap_it *it = (struct osd_zap_it *)di;
1250         struct osd_object *obj = it->ozi_obj;
1251         char              *name = (char *)key;
1252         int                rc;
1253         ENTRY;
1254
1255         LASSERT(it);
1256         LASSERT(it->ozi_zc);
1257
1258         /* reset the cursor */
1259         zap_cursor_fini(it->ozi_zc);
1260         osd_obj_cursor_init_serialized(it->ozi_zc, obj, 0);
1261
1262         /* XXX: implementation of the API is broken at the moment */
1263         LASSERT(((const char *)key)[0] == 0);
1264
1265         if (name[0] == 0) {
1266                 it->ozi_pos = OZI_POS_INIT;
1267                 RETURN(1);
1268         }
1269
1270         if (name[0] == '.') {
1271                 if (name[1] == 0) {
1272                         it->ozi_pos = OZI_POS_DOT;
1273                         GOTO(out, rc = 1);
1274                 } else if (name[1] == '.' && name[2] == 0) {
1275                         it->ozi_pos = OZI_POS_DOTDOT;
1276                         GOTO(out, rc = 1);
1277                 }
1278         }
1279
1280         /* neither . nor .. - some real record */
1281         it->ozi_pos = OZI_POS_REAL;
1282         rc = +1;
1283
1284 out:
1285         RETURN(rc);
1286 }
1287
1288 static void osd_dir_it_put(const struct lu_env *env, struct dt_it *di)
1289 {
1290         /* PBS: do nothing : ref are incremented at retrive and decreamented
1291          *      next/finish. */
1292 }
1293
1294 /*
1295  * in Orion . and .. were stored in the directory, while ZPL
1296  * and current osd-zfs generate them up on request. so, we
1297  * need to ignore previously stored . and ..
1298  */
1299 static int osd_index_retrieve_skip_dots(struct osd_zap_it *it,
1300                                         zap_attribute_t *za)
1301 {
1302         int rc, isdot;
1303
1304         do {
1305                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1306
1307                 isdot = 0;
1308                 if (unlikely(rc == 0 && za->za_name[0] == '.')) {
1309                         if (za->za_name[1] == 0) {
1310                                 isdot = 1;
1311                         } else if (za->za_name[1] == '.' &&
1312                                    za->za_name[2] == 0) {
1313                                 isdot = 1;
1314                         }
1315                         if (unlikely(isdot))
1316                                 zap_cursor_advance(it->ozi_zc);
1317                 }
1318         } while (unlikely(rc == 0 && isdot));
1319
1320         return rc;
1321 }
1322
1323 /**
1324  * to load a directory entry at a time and stored it in
1325  * iterator's in-memory data structure.
1326  *
1327  * \param di, struct osd_it_ea, iterator's in memory structure
1328  *
1329  * \retval +ve, iterator reached to end
1330  * \retval   0, iterator not reached to end
1331  * \retval -ve, on error
1332  */
1333 static int osd_dir_it_next(const struct lu_env *env, struct dt_it *di)
1334 {
1335         struct osd_zap_it *it = (struct osd_zap_it *)di;
1336         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1337         int                rc;
1338
1339         ENTRY;
1340
1341         /* temp. storage should be enough for any key supported by ZFS */
1342         BUILD_BUG_ON(sizeof(za->za_name) > sizeof(it->ozi_name));
1343
1344         /*
1345          * the first ->next() moves the cursor to .
1346          * the second ->next() moves the cursor to ..
1347          * then we get to the real records and have to verify any exist
1348          */
1349         if (it->ozi_pos <= OZI_POS_DOTDOT) {
1350                 it->ozi_pos++;
1351                 if (it->ozi_pos <= OZI_POS_DOTDOT)
1352                         RETURN(0);
1353
1354         } else {
1355                 zap_cursor_advance(it->ozi_zc);
1356         }
1357
1358         /*
1359          * According to current API we need to return error if its last entry.
1360          * zap_cursor_advance() does not return any value. So we need to call
1361          * retrieve to check if there is any record.  We should make
1362          * changes to Iterator API to not return status for this API
1363          */
1364         rc = osd_index_retrieve_skip_dots(it, za);
1365
1366         if (rc == -ENOENT) /* end of dir */
1367                 RETURN(+1);
1368
1369         RETURN(rc);
1370 }
1371
1372 static struct dt_key *osd_dir_it_key(const struct lu_env *env,
1373                                      const struct dt_it *di)
1374 {
1375         struct osd_zap_it *it = (struct osd_zap_it *)di;
1376         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1377         int                rc = 0;
1378         ENTRY;
1379
1380         if (it->ozi_pos <= OZI_POS_DOT) {
1381                 it->ozi_pos = OZI_POS_DOT;
1382                 RETURN((struct dt_key *)".");
1383         } else if (it->ozi_pos == OZI_POS_DOTDOT) {
1384                 RETURN((struct dt_key *)"..");
1385         }
1386
1387         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)))
1388                 RETURN(ERR_PTR(rc));
1389
1390         strcpy(it->ozi_name, za->za_name);
1391
1392         RETURN((struct dt_key *)it->ozi_name);
1393 }
1394
1395 static int osd_dir_it_key_size(const struct lu_env *env, const struct dt_it *di)
1396 {
1397         struct osd_zap_it *it = (struct osd_zap_it *)di;
1398         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1399         int                rc;
1400         ENTRY;
1401
1402         if (it->ozi_pos <= OZI_POS_DOT) {
1403                 it->ozi_pos = OZI_POS_DOT;
1404                 RETURN(2);
1405         } else if (it->ozi_pos == OZI_POS_DOTDOT) {
1406                 RETURN(3);
1407         }
1408
1409         if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
1410                 rc = strlen(za->za_name);
1411
1412         RETURN(rc);
1413 }
1414
1415 static int
1416 osd_dirent_update(const struct lu_env *env, struct osd_device *dev,
1417                   uint64_t zap, const char *key, struct luz_direntry *zde)
1418 {
1419         dmu_tx_t *tx;
1420         int rc;
1421         ENTRY;
1422
1423         tx = dmu_tx_create(dev->od_os);
1424         if (!tx)
1425                 RETURN(-ENOMEM);
1426
1427         dmu_tx_hold_zap(tx, zap, TRUE, NULL);
1428         rc = -dmu_tx_assign(tx, TXG_WAIT);
1429         if (!rc)
1430                 rc = -zap_update(dev->od_os, zap, key, 8, sizeof(*zde) / 8,
1431                                  (const void *)zde, tx);
1432         if (rc)
1433                 dmu_tx_abort(tx);
1434         else
1435                 dmu_tx_commit(tx);
1436
1437         RETURN(rc);
1438 }
1439
1440 static int osd_update_entry_for_agent(const struct lu_env *env,
1441                                       struct osd_device *osd,
1442                                       uint64_t zap, const char *name,
1443                                       struct luz_direntry *zde, __u32 attr)
1444 {
1445         dmu_tx_t *tx = NULL;
1446         int rc = 0;
1447         ENTRY;
1448
1449         if (attr & LUDA_VERIFY_DRYRUN)
1450                 GOTO(out, rc = 0);
1451
1452         tx = dmu_tx_create(osd->od_os);
1453         if (!tx)
1454                 GOTO(out, rc = -ENOMEM);
1455
1456         dmu_tx_hold_sa_create(tx, osd_find_dnsize(osd, OSD_BASE_EA_IN_BONUS));
1457         dmu_tx_hold_zap(tx, zap, FALSE, NULL);
1458         rc = -dmu_tx_assign(tx, TXG_WAIT);
1459         if (rc) {
1460                 dmu_tx_abort(tx);
1461                 GOTO(out, rc);
1462         }
1463
1464         rc = osd_create_agent_object(env, osd, zde, zap, tx);
1465         if (!rc)
1466                 rc = -zap_update(osd->od_os, zap, name, 8, sizeof(*zde) / 8,
1467                                  (const void *)zde, tx);
1468         dmu_tx_commit(tx);
1469
1470         GOTO(out, rc);
1471
1472 out:
1473         CDEBUG(D_LFSCK, "%s: Updated (%s) remote entry for "DFID": rc = %d\n",
1474                osd_name(osd), (attr & LUDA_VERIFY_DRYRUN) ? "(ro)" : "(rw)",
1475                PFID(&zde->lzd_fid), rc);
1476         return rc;
1477 }
1478
1479 static int osd_dir_it_rec(const struct lu_env *env, const struct dt_it *di,
1480                           struct dt_rec *dtrec, __u32 attr)
1481 {
1482         struct osd_zap_it *it = (struct osd_zap_it *)di;
1483         struct lu_dirent *lde = (struct lu_dirent *)dtrec;
1484         struct osd_thread_info *info = osd_oti_get(env);
1485         struct luz_direntry *zde = &info->oti_zde;
1486         zap_attribute_t *za = &info->oti_za;
1487         struct lu_fid *fid = &info->oti_fid;
1488         struct osd_device *osd = osd_obj2dev(it->ozi_obj);
1489         int rc, namelen;
1490         ENTRY;
1491
1492         lde->lde_attrs = 0;
1493         if (it->ozi_pos <= OZI_POS_DOT) {
1494                 /* notice hash=0 here, this is needed to avoid
1495                  * case when some real entry (after ./..) may
1496                  * have hash=0. in this case the client would
1497                  * be confused having records out of hash order. */
1498                 lde->lde_hash = cpu_to_le64(0);
1499                 strcpy(lde->lde_name, ".");
1500                 lde->lde_namelen = cpu_to_le16(1);
1501                 fid_cpu_to_le(&lde->lde_fid,
1502                               lu_object_fid(&it->ozi_obj->oo_dt.do_lu));
1503                 lde->lde_attrs = LUDA_FID;
1504                 /* append lustre attributes */
1505                 osd_it_append_attrs(lde, attr, 1, IFTODT(S_IFDIR));
1506                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(1, attr));
1507                 it->ozi_pos = OZI_POS_DOT;
1508                 RETURN(0);
1509         } else if (it->ozi_pos == OZI_POS_DOTDOT) {
1510                 /* same as for . above */
1511                 lde->lde_hash = cpu_to_le64(0);
1512                 strcpy(lde->lde_name, "..");
1513                 lde->lde_namelen = cpu_to_le16(2);
1514                 rc = osd_find_parent_fid(env, &it->ozi_obj->oo_dt, fid, NULL);
1515                 if (!rc) {
1516                         fid_cpu_to_le(&lde->lde_fid, fid);
1517                         lde->lde_attrs = LUDA_FID;
1518                 } else if (rc != -ENOENT) {
1519                         /* ENOENT happens at the root of filesystem, ignore */
1520                         RETURN(rc);
1521                 }
1522
1523                 /* append lustre attributes */
1524                 osd_it_append_attrs(lde, attr, 2, IFTODT(S_IFDIR));
1525                 lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(2, attr));
1526                 RETURN(0);
1527         }
1528
1529         LASSERT(lde);
1530
1531         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1532         if (unlikely(rc))
1533                 RETURN(rc);
1534
1535         lde->lde_hash = cpu_to_le64(osd_zap_cursor_serialize(it->ozi_zc));
1536         namelen = strlen(za->za_name);
1537         if (namelen > NAME_MAX)
1538                 RETURN(-EOVERFLOW);
1539         strcpy(lde->lde_name, za->za_name);
1540         lde->lde_namelen = cpu_to_le16(namelen);
1541
1542         if (za->za_integer_length != 8) {
1543                 CERROR("%s: unsupported direntry format: %d %d\n",
1544                        osd->od_svname,
1545                        za->za_integer_length, (int)za->za_num_integers);
1546                 RETURN(-EIO);
1547         }
1548
1549         rc = osd_zap_lookup(osd, it->ozi_zc->zc_zapobj, it->ozi_obj->oo_dn,
1550                             za->za_name, za->za_integer_length, 3, zde);
1551         if (rc)
1552                 RETURN(rc);
1553
1554         if (za->za_num_integers >= 3 && fid_is_sane(&zde->lzd_fid)) {
1555                 lde->lde_attrs = LUDA_FID;
1556                 fid_cpu_to_le(&lde->lde_fid, &zde->lzd_fid);
1557                 if (unlikely(zde->lzd_reg.zde_dnode == ZFS_NO_OBJECT &&
1558                              osd_remote_fid(env, osd, &zde->lzd_fid) > 0 &&
1559                              attr & LUDA_VERIFY)) {
1560                         /* It is mainly used for handling the MDT
1561                          * upgraded from old ZFS based backend. */
1562                         rc = osd_update_entry_for_agent(env, osd,
1563                                         it->ozi_obj->oo_dn->dn_object,
1564                                         za->za_name, zde, attr);
1565                         if (!rc)
1566                                 lde->lde_attrs |= LUDA_REPAIR;
1567                         else
1568                                 lde->lde_attrs |= LUDA_UNKNOWN;
1569                 }
1570
1571                 GOTO(pack_attr, rc = 0);
1572         }
1573
1574         if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP))
1575                 RETURN(-ENOENT);
1576
1577         rc = osd_get_fid_by_oid(env, osd, zde->lzd_reg.zde_dnode, fid);
1578         if (rc) {
1579                 lde->lde_attrs = LUDA_UNKNOWN;
1580                 GOTO(pack_attr, rc = 0);
1581         }
1582
1583         if (!(attr & LUDA_VERIFY)) {
1584                 fid_cpu_to_le(&lde->lde_fid, fid);
1585                 lde->lde_attrs = LUDA_FID;
1586                 GOTO(pack_attr, rc = 0);
1587         }
1588
1589         if (attr & LUDA_VERIFY_DRYRUN) {
1590                 fid_cpu_to_le(&lde->lde_fid, fid);
1591                 lde->lde_attrs = LUDA_FID | LUDA_REPAIR;
1592                 GOTO(pack_attr, rc = 0);
1593         }
1594
1595         fid_cpu_to_le(&lde->lde_fid, fid);
1596         lde->lde_attrs = LUDA_FID;
1597         zde->lzd_fid = *fid;
1598         rc = osd_dirent_update(env, osd, it->ozi_zc->zc_zapobj,
1599                                za->za_name, zde);
1600         if (rc) {
1601                 lde->lde_attrs |= LUDA_UNKNOWN;
1602                 GOTO(pack_attr, rc = 0);
1603         }
1604
1605         lde->lde_attrs |= LUDA_REPAIR;
1606
1607         GOTO(pack_attr, rc = 0);
1608
1609 pack_attr:
1610         osd_it_append_attrs(lde, attr, namelen, zde->lzd_reg.zde_type);
1611         lde->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
1612         return rc;
1613 }
1614
1615 static int osd_dir_it_rec_size(const struct lu_env *env, const struct dt_it *di,
1616                                __u32 attr)
1617 {
1618         struct osd_zap_it   *it = (struct osd_zap_it *)di;
1619         zap_attribute_t     *za = &osd_oti_get(env)->oti_za;
1620         size_t               namelen = 0;
1621         int                  rc;
1622         ENTRY;
1623
1624         if (it->ozi_pos <= OZI_POS_DOT)
1625                 namelen = 1;
1626         else if (it->ozi_pos == OZI_POS_DOTDOT)
1627                 namelen = 2;
1628
1629         if (namelen > 0) {
1630                 rc = lu_dirent_calc_size(namelen, attr);
1631                 RETURN(rc);
1632         }
1633
1634         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1635         if (unlikely(rc != 0))
1636                 RETURN(rc);
1637
1638         if (za->za_integer_length != 8 || za->za_num_integers < 3) {
1639                 CERROR("%s: unsupported direntry format: %d %d\n",
1640                        osd_obj2dev(it->ozi_obj)->od_svname,
1641                        za->za_integer_length, (int)za->za_num_integers);
1642                 RETURN(-EIO);
1643         }
1644
1645         namelen = strlen(za->za_name);
1646         if (namelen > NAME_MAX)
1647                 RETURN(-EOVERFLOW);
1648
1649         rc = lu_dirent_calc_size(namelen, attr);
1650
1651         RETURN(rc);
1652 }
1653
1654 static __u64 osd_dir_it_store(const struct lu_env *env, const struct dt_it *di)
1655 {
1656         struct osd_zap_it *it = (struct osd_zap_it *)di;
1657         __u64              pos;
1658         ENTRY;
1659
1660         if (it->ozi_pos <= OZI_POS_DOTDOT)
1661                 pos = 0;
1662         else
1663                 pos = osd_zap_cursor_serialize(it->ozi_zc);
1664
1665         RETURN(pos);
1666 }
1667
1668 /*
1669  * return status :
1670  *  rc == 0 -> end of directory.
1671  *  rc >  0 -> ok, proceed.
1672  *  rc <  0 -> error.  ( EOVERFLOW  can be masked.)
1673  */
1674 static int osd_dir_it_load(const struct lu_env *env,
1675                         const struct dt_it *di, __u64 hash)
1676 {
1677         struct osd_zap_it *it = (struct osd_zap_it *)di;
1678         struct osd_object *obj = it->ozi_obj;
1679         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1680         int                rc;
1681         ENTRY;
1682
1683         /* reset the cursor */
1684         zap_cursor_fini(it->ozi_zc);
1685         osd_obj_cursor_init_serialized(it->ozi_zc, obj, hash);
1686
1687         if (hash == 0) {
1688                 it->ozi_pos = OZI_POS_INIT;
1689                 rc = +1; /* there will be ./.. at least */
1690         } else {
1691                 it->ozi_pos = OZI_POS_REAL;
1692                 /* to return whether the end has been reached */
1693                 rc = osd_index_retrieve_skip_dots(it, za);
1694                 if (rc == 0)
1695                         rc = +1;
1696                 else if (rc == -ENOENT)
1697                         rc = 0;
1698         }
1699
1700         RETURN(rc);
1701 }
1702
1703 struct dt_index_operations osd_dir_ops = {
1704         .dio_lookup         = osd_dir_lookup,
1705         .dio_declare_insert = osd_declare_dir_insert,
1706         .dio_insert         = osd_dir_insert,
1707         .dio_declare_delete = osd_declare_dir_delete,
1708         .dio_delete         = osd_dir_delete,
1709         .dio_it     = {
1710                 .init     = osd_dir_it_init,
1711                 .fini     = osd_index_it_fini,
1712                 .get      = osd_dir_it_get,
1713                 .put      = osd_dir_it_put,
1714                 .next     = osd_dir_it_next,
1715                 .key      = osd_dir_it_key,
1716                 .key_size = osd_dir_it_key_size,
1717                 .rec      = osd_dir_it_rec,
1718                 .rec_size = osd_dir_it_rec_size,
1719                 .store    = osd_dir_it_store,
1720                 .load     = osd_dir_it_load
1721         }
1722 };
1723
1724 /*
1725  * Primitives for index files using binary keys.
1726  */
1727
1728 /* key integer_size is 8 */
1729 static int osd_prepare_key_uint64(struct osd_object *o, __u64 *dst,
1730                                   const struct dt_key *src)
1731 {
1732         int size;
1733
1734         LASSERT(dst);
1735         LASSERT(src);
1736
1737         /* align keysize to 64bit */
1738         size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
1739         size *= sizeof(__u64);
1740
1741         LASSERT(size <= MAXNAMELEN);
1742
1743         if (unlikely(size > o->oo_keysize))
1744                 memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
1745         memcpy(dst, (const char *)src, o->oo_keysize);
1746
1747         return (size/sizeof(__u64));
1748 }
1749
1750 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
1751                         struct dt_rec *rec, const struct dt_key *key)
1752 {
1753         struct osd_object *obj = osd_dt_obj(dt);
1754         struct osd_device *osd = osd_obj2dev(obj);
1755         __u64             *k = osd_oti_get(env)->oti_key64;
1756         int                rc;
1757         ENTRY;
1758
1759         rc = osd_prepare_key_uint64(obj, k, key);
1760
1761         rc = -zap_lookup_uint64(osd->od_os, obj->oo_dn->dn_object,
1762                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1763                                 (void *)rec);
1764         RETURN(rc == 0 ? 1 : rc);
1765 }
1766
1767 static int osd_declare_index_insert(const struct lu_env *env,
1768                                     struct dt_object *dt,
1769                                     const struct dt_rec *rec,
1770                                     const struct dt_key *key,
1771                                     struct thandle *th)
1772 {
1773         struct osd_object  *obj = osd_dt_obj(dt);
1774         struct osd_thandle *oh;
1775         ENTRY;
1776
1777         LASSERT(th != NULL);
1778         oh = container_of(th, struct osd_thandle, ot_super);
1779
1780         LASSERT(obj->oo_dn);
1781
1782         /* do not specify the key as then DMU is trying to look it up
1783          * which is very expensive. usually the layers above lookup
1784          * before insertion */
1785         osd_tx_hold_zap(oh->ot_tx, obj->oo_dn->dn_object, obj->oo_dn,
1786                         TRUE, NULL);
1787
1788         RETURN(0);
1789 }
1790
1791 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
1792                             const struct dt_rec *rec, const struct dt_key *key,
1793                             struct thandle *th)
1794 {
1795         struct osd_object  *obj = osd_dt_obj(dt);
1796         struct osd_device  *osd = osd_obj2dev(obj);
1797         struct osd_thandle *oh;
1798         __u64              *k = osd_oti_get(env)->oti_key64;
1799         int                 rc;
1800         ENTRY;
1801
1802         LASSERT(obj->oo_dn);
1803         LASSERT(dt_object_exists(dt));
1804         LASSERT(osd_invariant(obj));
1805         LASSERT(th != NULL);
1806
1807         oh = container_of(th, struct osd_thandle, ot_super);
1808
1809         rc = osd_prepare_key_uint64(obj, k, key);
1810
1811         /* Insert (key,oid) into ZAP */
1812         rc = -zap_add_uint64(osd->od_os, obj->oo_dn->dn_object,
1813                              k, rc, obj->oo_recusize, obj->oo_recsize,
1814                              (void *)rec, oh->ot_tx);
1815         RETURN(rc);
1816 }
1817
1818 static int osd_declare_index_delete(const struct lu_env *env,
1819                                     struct dt_object *dt,
1820                                     const struct dt_key *key,
1821                                     struct thandle *th)
1822 {
1823         struct osd_object  *obj = osd_dt_obj(dt);
1824         struct osd_thandle *oh;
1825         ENTRY;
1826
1827         LASSERT(dt_object_exists(dt));
1828         LASSERT(osd_invariant(obj));
1829         LASSERT(th != NULL);
1830         LASSERT(obj->oo_dn);
1831
1832         oh = container_of(th, struct osd_thandle, ot_super);
1833
1834         /* do not specify the key as then DMU is trying to look it up
1835          * which is very expensive. usually the layers above lookup
1836          * before deletion */
1837         osd_tx_hold_zap(oh->ot_tx, obj->oo_dn->dn_object, obj->oo_dn,
1838                         FALSE, NULL);
1839
1840         RETURN(0);
1841 }
1842
1843 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
1844                             const struct dt_key *key, struct thandle *th)
1845 {
1846         struct osd_object  *obj = osd_dt_obj(dt);
1847         struct osd_device  *osd = osd_obj2dev(obj);
1848         struct osd_thandle *oh;
1849         __u64              *k = osd_oti_get(env)->oti_key64;
1850         int                 rc;
1851         ENTRY;
1852
1853         LASSERT(obj->oo_dn);
1854         LASSERT(th != NULL);
1855         oh = container_of(th, struct osd_thandle, ot_super);
1856
1857         rc = osd_prepare_key_uint64(obj, k, key);
1858
1859         /* Remove binary key from the ZAP */
1860         rc = -zap_remove_uint64(osd->od_os, obj->oo_dn->dn_object,
1861                                 k, rc, oh->ot_tx);
1862         RETURN(rc);
1863 }
1864
1865 static int osd_index_it_get(const struct lu_env *env, struct dt_it *di,
1866                             const struct dt_key *key)
1867 {
1868         struct osd_zap_it *it = (struct osd_zap_it *)di;
1869         struct osd_object *obj = it->ozi_obj;
1870         struct osd_device *osd = osd_obj2dev(obj);
1871         ENTRY;
1872
1873         LASSERT(it);
1874         LASSERT(it->ozi_zc);
1875
1876         /*
1877          * XXX: we need a binary version of zap_cursor_move_to_key()
1878          *      to implement this API */
1879         if (*((const __u64 *)key) != 0)
1880                 CERROR("NOT IMPLEMETED YET (move to %#llx)\n",
1881                        *((__u64 *)key));
1882
1883         zap_cursor_fini(it->ozi_zc);
1884         zap_cursor_init(it->ozi_zc, osd->od_os, obj->oo_dn->dn_object);
1885         it->ozi_reset = 1;
1886
1887         RETURN(+1);
1888 }
1889
1890 static int osd_index_it_next(const struct lu_env *env, struct dt_it *di)
1891 {
1892         struct osd_zap_it *it = (struct osd_zap_it *)di;
1893         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1894         int                rc;
1895         ENTRY;
1896
1897         if (it->ozi_reset == 0)
1898                 zap_cursor_advance(it->ozi_zc);
1899         it->ozi_reset = 0;
1900
1901         /*
1902          * According to current API we need to return error if it's last entry.
1903          * zap_cursor_advance() does not return any value. So we need to call
1904          * retrieve to check if there is any record.  We should make
1905          * changes to Iterator API to not return status for this API
1906          */
1907         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1908         if (rc == -ENOENT)
1909                 RETURN(+1);
1910
1911         RETURN((rc));
1912 }
1913
1914 static struct dt_key *osd_index_it_key(const struct lu_env *env,
1915                                        const struct dt_it *di)
1916 {
1917         struct osd_zap_it *it = (struct osd_zap_it *)di;
1918         struct osd_object *obj = it->ozi_obj;
1919         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1920         int                rc = 0;
1921         ENTRY;
1922
1923         it->ozi_reset = 0;
1924         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1925         if (rc)
1926                 RETURN(ERR_PTR(rc));
1927
1928         /* the binary key is stored in the name */
1929         memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
1930
1931         RETURN((struct dt_key *)&it->ozi_key);
1932 }
1933
1934 static int osd_index_it_key_size(const struct lu_env *env,
1935                                 const struct dt_it *di)
1936 {
1937         struct osd_zap_it *it = (struct osd_zap_it *)di;
1938         struct osd_object *obj = it->ozi_obj;
1939         RETURN(obj->oo_keysize);
1940 }
1941
1942 static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
1943                             struct dt_rec *rec, __u32 attr)
1944 {
1945         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1946         struct osd_zap_it *it = (struct osd_zap_it *)di;
1947         struct osd_object *obj = it->ozi_obj;
1948         struct osd_device *osd = osd_obj2dev(obj);
1949         __u64             *k = osd_oti_get(env)->oti_key64;
1950         int                rc;
1951         ENTRY;
1952
1953         it->ozi_reset = 0;
1954         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1955         if (rc)
1956                 RETURN(rc);
1957
1958         rc = osd_prepare_key_uint64(obj, k, (const struct dt_key *)za->za_name);
1959
1960         rc = -zap_lookup_uint64(osd->od_os, obj->oo_dn->dn_object,
1961                                 k, rc, obj->oo_recusize, obj->oo_recsize,
1962                                 (void *)rec);
1963         RETURN(rc);
1964 }
1965
1966 static __u64 osd_index_it_store(const struct lu_env *env,
1967                                 const struct dt_it *di)
1968 {
1969         struct osd_zap_it *it = (struct osd_zap_it *)di;
1970
1971         it->ozi_reset = 0;
1972         RETURN((__u64)zap_cursor_serialize(it->ozi_zc));
1973 }
1974
1975 static int osd_index_it_load(const struct lu_env *env, const struct dt_it *di,
1976                              __u64 hash)
1977 {
1978         struct osd_zap_it *it = (struct osd_zap_it *)di;
1979         struct osd_object *obj = it->ozi_obj;
1980         struct osd_device *osd = osd_obj2dev(obj);
1981         zap_attribute_t   *za = &osd_oti_get(env)->oti_za;
1982         int                rc;
1983         ENTRY;
1984
1985         /* reset the cursor */
1986         zap_cursor_fini(it->ozi_zc);
1987         zap_cursor_init_serialized(it->ozi_zc, osd->od_os,
1988                                    obj->oo_dn->dn_object, hash);
1989         it->ozi_reset = 0;
1990
1991         rc = -zap_cursor_retrieve(it->ozi_zc, za);
1992         if (rc == 0)
1993                 RETURN(+1);
1994         else if (rc == -ENOENT)
1995                 RETURN(0);
1996
1997         RETURN(rc);
1998 }
1999
2000 static struct dt_index_operations osd_index_ops = {
2001         .dio_lookup             = osd_index_lookup,
2002         .dio_declare_insert     = osd_declare_index_insert,
2003         .dio_insert             = osd_index_insert,
2004         .dio_declare_delete     = osd_declare_index_delete,
2005         .dio_delete             = osd_index_delete,
2006         .dio_it = {
2007                 .init           = osd_index_it_init,
2008                 .fini           = osd_index_it_fini,
2009                 .get            = osd_index_it_get,
2010                 .put            = osd_index_it_put,
2011                 .next           = osd_index_it_next,
2012                 .key            = osd_index_it_key,
2013                 .key_size       = osd_index_it_key_size,
2014                 .rec            = osd_index_it_rec,
2015                 .store          = osd_index_it_store,
2016                 .load           = osd_index_it_load
2017         }
2018 };
2019
2020 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2021                 const struct dt_index_features *feat)
2022 {
2023         struct osd_object *obj = osd_dt_obj(dt);
2024         struct osd_device *osd = osd_obj2dev(obj);
2025         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2026         int rc = 0;
2027         ENTRY;
2028
2029         down_read(&obj->oo_guard);
2030
2031         /*
2032          * XXX: implement support for fixed-size keys sorted with natural
2033          *      numerical way (not using internal hash value)
2034          */
2035         if (feat->dif_flags & DT_IND_RANGE)
2036                 GOTO(out, rc = -ERANGE);
2037
2038         if (unlikely(feat == &dt_otable_features)) {
2039                 dt->do_index_ops = &osd_otable_ops;
2040                 GOTO(out, rc = 0);
2041         }
2042
2043         LASSERT(!dt_object_exists(dt) || obj->oo_dn != NULL);
2044         if (likely(feat == &dt_directory_features)) {
2045                 if (!dt_object_exists(dt) || osd_object_is_zap(obj->oo_dn))
2046                         dt->do_index_ops = &osd_dir_ops;
2047                 else
2048                         GOTO(out, rc = -ENOTDIR);
2049         } else if (unlikely(feat == &dt_acct_features)) {
2050                 LASSERT(fid_is_acct(fid));
2051                 dt->do_index_ops = &osd_acct_index_ops;
2052         } else if (dt->do_index_ops == NULL) {
2053                 /* For index file, we don't support variable key & record sizes
2054                  * and the key has to be unique */
2055                 if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
2056                         GOTO(out, rc = -EINVAL);
2057
2058                 if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
2059                         GOTO(out, rc = -E2BIG);
2060                 if (feat->dif_keysize_max != feat->dif_keysize_min)
2061                         GOTO(out, rc = -EINVAL);
2062
2063                 /* As for the record size, it should be a multiple of 8 bytes
2064                  * and smaller than the maximum value length supported by ZAP.
2065                  */
2066                 if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
2067                         GOTO(out, rc = -E2BIG);
2068                 if (feat->dif_recsize_max != feat->dif_recsize_min)
2069                         GOTO(out, rc = -EINVAL);
2070
2071                 obj->oo_keysize = feat->dif_keysize_max;
2072                 obj->oo_recsize = feat->dif_recsize_max;
2073                 obj->oo_recusize = 1;
2074
2075                 /* ZFS prefers to work with array of 64bits */
2076                 if ((obj->oo_recsize & 7) == 0) {
2077                         obj->oo_recsize >>= 3;
2078                         obj->oo_recusize = 8;
2079                 }
2080                 dt->do_index_ops = &osd_index_ops;
2081
2082                 if (feat == &dt_lfsck_layout_orphan_features ||
2083                     feat == &dt_lfsck_layout_dangling_features ||
2084                     feat == &dt_lfsck_namespace_features)
2085                         GOTO(out, rc = 0);
2086
2087                 rc = osd_index_register(osd, fid, obj->oo_keysize,
2088                                         obj->oo_recusize * obj->oo_recsize);
2089                 if (rc < 0)
2090                         CWARN("%s: failed to register index "DFID": rc = %d\n",
2091                               osd_name(osd), PFID(fid), rc);
2092                 else if (rc > 0)
2093                         rc = 0;
2094                 else
2095                         CDEBUG(D_LFSCK, "%s: index object "DFID
2096                                " (%u/%u/%u) registered\n",
2097                                osd_name(osd), PFID(fid), obj->oo_keysize,
2098                                obj->oo_recusize, obj->oo_recsize);
2099         }
2100
2101 out:
2102         up_read(&obj->oo_guard);
2103
2104         RETURN(rc);
2105 }