Whamcloud - gitweb
LU-16974 utils: make bandwidth options consistent
[fs/lustre-release.git] / lustre / osd-zfs / osd_xattr.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2012, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * Functions to manipulate extended attributes and system attributes
14  *
15  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
16  * Author: Mike Pershin <tappro@whamcloud.com>
17  */
18
19 #define DEBUG_SUBSYSTEM S_OSD
20
21 #include <libcfs/libcfs.h>
22 #include <obd_support.h>
23 #include <lustre_net.h>
24 #include <obd.h>
25 #include <obd_class.h>
26 #include <lustre_disk.h>
27 #include <lustre_fid.h>
28 #include <lustre_linkea.h>
29
30 #include "osd_internal.h"
31
32 #include <sys/dnode.h>
33 #include <sys/dbuf.h>
34 #include <sys/spa.h>
35 #include <sys/stat.h>
36 #include <sys/zap.h>
37 #include <sys/spa_impl.h>
38 #include <sys/zfs_znode.h>
39 #include <sys/dmu_tx.h>
40 #include <sys/dmu_objset.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/sa_impl.h>
43 #include <sys/txg.h>
44
45 #include <linux/posix_acl_xattr.h>
46 #include <lustre_scrub.h>
47
48 int __osd_xattr_load(struct osd_device *osd, sa_handle_t *hdl, nvlist_t **sa)
49 {
50         char *buf;
51         int rc, size;
52
53         rc = -sa_size(hdl, SA_ZPL_DXATTR(osd), &size);
54         if (rc) {
55                 if (rc == -ENOENT)
56                         rc = -nvlist_alloc(sa, NV_UNIQUE_NAME, KM_SLEEP);
57                 goto out_sa;
58         }
59
60         buf = zio_buf_alloc(size);
61         if (buf == NULL) {
62                 rc = -ENOMEM;
63                 goto out_sa;
64         }
65         rc = -sa_lookup(hdl, SA_ZPL_DXATTR(osd), buf, size);
66         if (rc == 0)
67                 rc = -nvlist_unpack(buf, size, sa, KM_SLEEP);
68         zio_buf_free(buf, size);
69 out_sa:
70
71         return rc;
72 }
73
74 static inline int __osd_xattr_cache(struct osd_object *obj)
75 {
76         LASSERT(obj->oo_sa_hdl);
77         if (obj->oo_sa_xattr != NULL)
78                 return 0;
79         return __osd_xattr_load(osd_obj2dev(obj),
80                                 obj->oo_sa_hdl, &obj->oo_sa_xattr);
81 }
82
83 static int
84 __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj,
85                    const struct lu_buf *buf, const char *name, int *sizep)
86 {
87         uchar_t *nv_value;
88         int rc = 0;
89
90         rc = __osd_xattr_cache(obj);
91         if (rc)
92                 return rc;
93
94         LASSERT(obj->oo_sa_xattr);
95         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name,
96                                        &nv_value, sizep);
97         if (rc)
98                 return rc;
99
100         if (buf == NULL || buf->lb_buf == NULL) {
101                 /* return the required size by *sizep */
102                 return 0;
103         }
104
105         if (*sizep > buf->lb_len)
106                 return -ERANGE; /* match ldiskfs error */
107
108         memcpy(buf->lb_buf, nv_value, *sizep);
109         return 0;
110 }
111
112 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
113                           uint64_t xattr, struct lu_buf *buf,
114                           const char *name, int *sizep)
115 {
116         dnode_t *xa_data_dn;
117         sa_handle_t *sa_hdl = NULL;
118         uint64_t xa_data_obj, size;
119         int rc;
120
121         /* are there any extended attributes? */
122         if (xattr == ZFS_NO_OBJECT)
123                 return -ENOENT;
124
125         /* Lookup the object number containing the xattr data */
126         rc = -zap_lookup(osd->od_os, xattr, name, sizeof(uint64_t), 1,
127                         &xa_data_obj);
128         if (rc)
129                 return rc;
130
131         rc = __osd_obj2dnode(osd->od_os, xa_data_obj, &xa_data_dn);
132         if (rc)
133                 return rc;
134
135         rc = -sa_handle_get(osd->od_os, xa_data_obj, NULL, SA_HDL_PRIVATE,
136                         &sa_hdl);
137         if (rc)
138                 goto out_rele;
139
140         /* Get the xattr value length / object size */
141         rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(osd), &size, 8);
142         if (rc)
143                 goto out;
144
145         if (size > INT_MAX) {
146                 rc = -EOVERFLOW;
147                 goto out;
148         }
149
150         *sizep = (int)size;
151
152         if (buf == NULL || buf->lb_buf == NULL) {
153                 /* We only need to return the required size */
154                 goto out;
155         }
156         if (*sizep > buf->lb_len) {
157                 rc = -ERANGE; /* match ldiskfs error */
158                 goto out;
159         }
160
161         rc = -dmu_read(osd->od_os, xa_data_dn->dn_object, 0,
162                         size, buf->lb_buf, DMU_READ_PREFETCH);
163
164 out:
165         sa_handle_destroy(sa_hdl);
166 out_rele:
167         osd_dnode_rele(xa_data_dn);
168
169         return rc;
170 }
171
172 /**
173  * Copy an extended attribute into the buffer provided, or compute
174  * the required buffer size if \a buf is NULL.
175  *
176  * On success, the number of bytes used or required is stored in \a sizep.
177  *
178  * Note that no locking is done here.
179  *
180  * \param[in] env      execution environment
181  * \param[in] obj      object for which to retrieve xattr
182  * \param[out] buf     buffer to store xattr value in
183  * \param[in] name     name of xattr to copy
184  * \param[out] sizep   bytes used or required to store xattr
185  *
186  * \retval 0           on success
187  * \retval negative    negated errno on failure
188  */
189 int osd_xattr_get_internal(const struct lu_env *env, struct osd_object *obj,
190                            struct lu_buf *buf, const char *name, int *sizep)
191 {
192         int rc;
193
194         if (unlikely(!dt_object_exists(&obj->oo_dt) || obj->oo_destroyed))
195                 return -ENOENT;
196
197         /* check SA_ZPL_DXATTR first then fallback to directory xattr */
198         rc = __osd_sa_xattr_get(env, obj, buf, name, sizep);
199         if (rc != -ENOENT)
200                 return rc;
201
202         return __osd_xattr_get_large(env, osd_obj2dev(obj), obj->oo_xattr,
203                                      buf, name, sizep);
204 }
205
206 /**
207  * Copy LMA extended attribute into provided buffer
208  *
209  * Note that no locking is done here.
210  *
211  * \param[in] env      execution environment
212  * \param[in] obj      object for which to retrieve xattr
213  * \param[out] buf     buffer to store xattr value in
214  *
215  * \retval 0           on success
216  * \retval negative    negated errno on failure
217  */
218 int osd_xattr_get_lma(const struct lu_env *env, struct osd_object *obj,
219                       struct lu_buf *buf)
220 {
221         int size = 0;
222         int rc = -ENOENT;
223
224         if (!buf)
225                 return 0;
226
227         if (unlikely(obj->oo_destroyed))
228                 goto out_lma;
229
230         /* check SA_ZPL_DXATTR first then fallback to directory xattr */
231         rc = __osd_sa_xattr_get(env, obj, buf, XATTR_NAME_LMA, &size);
232         if (!rc && unlikely(size < sizeof(struct lustre_mdt_attrs)))
233                 rc = -EINVAL;
234         if (rc != -ENOENT)
235                 goto out_lma;
236
237         rc = __osd_xattr_get_large(env, osd_obj2dev(obj), obj->oo_xattr,
238                                      buf, XATTR_NAME_LMA, &size);
239         if (!rc && unlikely(size < sizeof(struct lustre_mdt_attrs)))
240                 rc = -EINVAL;
241
242 out_lma:
243         return rc;
244 }
245
246 static int osd_get_pfid_from_lma(const struct lu_env *env,
247                                  struct osd_object *obj,
248                                  struct lu_buf *buf, int *sizep)
249 {
250         struct osd_thread_info *info = osd_oti_get(env);
251         struct lustre_ost_attrs *loa =
252                 (struct lustre_ost_attrs *)&info->oti_buf;
253         struct lustre_mdt_attrs *lma = &loa->loa_lma;
254         struct filter_fid *ff;
255         struct ost_layout *ol;
256         struct lu_buf tbuf = {
257                 .lb_buf = loa,
258                 .lb_len = sizeof(info->oti_buf),
259         };
260         int rc;
261
262         ENTRY;
263         BUILD_BUG_ON(sizeof(info->oti_buf) < sizeof(*loa));
264         rc = osd_xattr_get_internal(env, obj, &tbuf,
265                                     XATTR_NAME_LMA, sizep);
266         if (rc)
267                 RETURN(rc);
268
269         lustre_loa_swab(loa, true);
270         LASSERT(lma->lma_compat & LMAC_STRIPE_INFO);
271
272         *sizep = sizeof(*ff);
273         if (buf->lb_len == 0 || !buf->lb_buf)
274                 RETURN(0);
275
276         if (buf->lb_len < *sizep)
277                 RETURN(-ERANGE);
278
279         ff = buf->lb_buf;
280         ol = &ff->ff_layout;
281         ol->ol_stripe_count = cpu_to_le32(loa->loa_parent_fid.f_ver >>
282                                           PFID_STRIPE_IDX_BITS);
283         ol->ol_stripe_size = cpu_to_le32(loa->loa_stripe_size);
284         loa->loa_parent_fid.f_ver &= PFID_STRIPE_COUNT_MASK;
285         fid_cpu_to_le(&ff->ff_parent, &loa->loa_parent_fid);
286         if (lma->lma_compat & LMAC_COMP_INFO) {
287                 ol->ol_comp_start = cpu_to_le64(loa->loa_comp_start);
288                 ol->ol_comp_end = cpu_to_le64(loa->loa_comp_end);
289                 ol->ol_comp_id = cpu_to_le32(loa->loa_comp_id);
290         } else {
291                 ol->ol_comp_start = 0;
292                 ol->ol_comp_end = 0;
293                 ol->ol_comp_id = 0;
294         }
295
296         RETURN(0);
297 }
298
299 int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
300                   struct lu_buf *buf, const char *name)
301 {
302         struct osd_object *obj = osd_dt_obj(dt);
303         int rc, size = 0;
304
305         ENTRY;
306         if (!osd_obj2dev(obj)->od_posix_acl &&
307             (strcmp(name, XATTR_NAME_POSIX_ACL_ACCESS) == 0 ||
308              strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
309                 RETURN(-EOPNOTSUPP);
310
311         down_read(&obj->oo_guard);
312         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
313                 GOTO(out, rc = -ENOENT);
314         LASSERT(obj->oo_dn != NULL);
315
316         /* For the OST migrated from ldiskfs, the PFID EA may
317          * be stored in LMA because of ldiskfs inode size.
318          */
319         if (strcmp(name, XATTR_NAME_FID) == 0 && obj->oo_pfid_in_lma)
320                 rc = osd_get_pfid_from_lma(env, obj, buf, &size);
321         else
322                 rc = osd_xattr_get_internal(env, obj, buf, name, &size);
323
324         if (rc == -ENOENT)
325                 rc = -ENODATA;
326         else if (rc == 0)
327                 rc = size;
328 out:
329         up_read(&obj->oo_guard);
330         RETURN(rc);
331 }
332
333 /* the function is used to declare EAs when SA is not supported */
334 static void __osd_xattr_declare_legacy(const struct lu_env *env,
335                                        struct osd_object *obj,
336                                        int vallen, const char *name,
337                                        struct osd_thandle *oh)
338 {
339         struct osd_device *osd = osd_obj2dev(obj);
340         dmu_tx_t *tx = oh->ot_tx;
341         uint64_t xa_data_obj;
342         int rc;
343
344         if (obj->oo_xattr == ZFS_NO_OBJECT) {
345                 /* xattr zap + entry */
346                 dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name);
347                 /* xattr value obj */
348                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
349                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
350                 return;
351         }
352
353         rc = -zap_lookup(osd->od_os, obj->oo_xattr, name, sizeof(uint64_t), 1,
354                         &xa_data_obj);
355         if (rc == 0) {
356                 /*
357                  * Entry already exists.
358                  * We'll truncate the existing object.
359                  */
360                 dmu_tx_hold_bonus(tx, xa_data_obj);
361                 dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END);
362                 dmu_tx_hold_write(tx, xa_data_obj, 0, vallen);
363         } else if (rc == -ENOENT) {
364                 /*
365                  * Entry doesn't exist, we need to create a new one and a new
366                  * object to store the value.
367                  */
368                 dmu_tx_hold_bonus(tx, obj->oo_xattr);
369                 dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name);
370                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
371                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
372         }
373 }
374
375 void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
376                              int vallen, const char *name,
377                              struct osd_thandle *oh)
378 {
379         struct osd_device *osd = osd_obj2dev(obj);
380         dmu_tx_t *tx = oh->ot_tx;
381         int bonuslen;
382
383         if (unlikely(obj->oo_destroyed))
384                 return;
385
386         if (strcmp(name, XATTR_NAME_LINK) == 0 &&
387             osd->od_remote_parent_dir != ZFS_NO_OBJECT) {
388                 /* If some name entry resides on remote MDT, then will create
389                  * agent entry under remote parent. On the other hand, if the
390                  * remote entry will be removed, then related agent entry may
391                  * need to be removed from the remote parent. So there may be
392                  * kinds of cases, let's declare enough credits. The credits
393                  * for create agent entry is enough for remove case.
394                  */
395                 osd_tx_hold_zap(tx, osd->od_remote_parent_dir,
396                                 NULL, TRUE, NULL);
397         }
398
399         if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) {
400                 __osd_xattr_declare_legacy(env, obj, vallen, name, oh);
401                 return;
402         }
403
404         /* declare EA in SA */
405         if (dt_object_exists(&obj->oo_dt)) {
406                 LASSERT(obj->oo_sa_hdl);
407                 /* XXX: it should be possible to skip spill declaration if
408                  * specific EA is part of bonus and doesn't grow
409                  */
410                 dmu_tx_hold_spill(tx, obj->oo_dn->dn_object);
411                 return;
412         }
413
414         bonuslen = osd_obj_bonuslen(obj);
415
416         /* the object doesn't exist, but we've declared bonus
417          * in osd_declare_object_create() yet
418          */
419         if (obj->oo_ea_in_bonus > bonuslen) {
420                 /* spill has been declared already */
421         } else if (obj->oo_ea_in_bonus + vallen > bonuslen) {
422                 /* we're about to exceed bonus, let's declare spill */
423                 dmu_tx_hold_spill(tx, DMU_NEW_OBJECT);
424         }
425         obj->oo_ea_in_bonus += vallen;
426 }
427
428 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
429                           const struct lu_buf *buf, const char *name,
430                           int fl, struct thandle *handle)
431 {
432         struct osd_object *obj = osd_dt_obj(dt);
433         struct osd_thandle *oh;
434
435         ENTRY;
436         LASSERT(handle != NULL);
437         oh = container_of(handle, struct osd_thandle, ot_super);
438
439         down_read(&obj->oo_guard);
440         __osd_xattr_declare_set(env, obj, buf->lb_len, name, oh);
441         up_read(&obj->oo_guard);
442
443         RETURN(0);
444 }
445
446 int __osd_sa_attr_init(const struct lu_env *env, struct osd_object *obj,
447                        struct osd_thandle *oh)
448 {
449         sa_bulk_attr_t *bulk = osd_oti_get(env)->oti_attr_bulk;
450         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
451         struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf;
452         struct osd_device *osd = osd_obj2dev(obj);
453         uint64_t gen;
454         inode_timespec_t now;
455         size_t size;
456         int rc, cnt;
457
458         obj->oo_late_xattr = 0;
459         obj->oo_late_attr_set = 0;
460
461         gen = dmu_tx_get_txg(oh->ot_tx);
462         gethrestime(&now);
463         ZFS_TIME_ENCODE(&now, osa->btime);
464
465         obj->oo_attr.la_valid |= LA_BTIME;
466         obj->oo_attr.la_btime = osa->btime[0];
467         osa->atime[0] = obj->oo_attr.la_atime;
468         osa->ctime[0] = obj->oo_attr.la_ctime;
469         osa->mtime[0] = obj->oo_attr.la_mtime;
470         osa->mode = obj->oo_attr.la_mode;
471         osa->uid = obj->oo_attr.la_uid;
472         osa->gid = obj->oo_attr.la_gid;
473         osa->rdev = obj->oo_attr.la_rdev;
474         osa->nlink = obj->oo_attr.la_nlink;
475         osa->flags = attrs_fs2zfs(obj->oo_attr.la_flags);
476         osa->size  = obj->oo_attr.la_size;
477 #ifdef ZFS_PROJINHERIT
478         if (osd->od_projectused_dn) {
479                 if (obj->oo_attr.la_valid & LA_PROJID)
480                         osa->projid = obj->oo_attr.la_projid;
481                 else
482                         osa->projid = ZFS_DEFAULT_PROJID;
483                 osa->flags |= ZFS_PROJID;
484                 obj->oo_with_projid = 1;
485         }
486 #endif
487
488         cnt = 0;
489         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8);
490         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8);
491         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8);
492         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8);
493         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8);
494         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL,
495                          &obj->oo_parent, 8);
496         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8);
497         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16);
498         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16);
499         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16);
500         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, osa->btime, 16);
501         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8);
502 #ifdef ZFS_PROJINHERIT
503         if (osd->od_projectused_dn)
504                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PROJID(osd), NULL,
505                                  &osa->projid, 8);
506 #endif
507         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
508         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
509
510         /* Update the SA for additions, modifications, and removals. */
511         rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
512         if (rc)
513                 return rc;
514
515         lu_buf_check_and_alloc(lb, size);
516         if (lb->lb_buf == NULL) {
517                 rc = -ENOMEM;
518                 CERROR("%s: can't allocate buffer for xattr update: rc = %d\n",
519                        osd->od_svname, rc);
520                 return rc;
521         }
522
523         rc = -nvlist_pack(obj->oo_sa_xattr, (char **)&lb->lb_buf, &size,
524                           NV_ENCODE_XDR, KM_SLEEP);
525         if (rc)
526                 return rc;
527
528         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd), NULL, lb->lb_buf, size);
529
530         rc = -sa_replace_all_by_template(obj->oo_sa_hdl, bulk, cnt, oh->ot_tx);
531
532         return rc;
533 }
534
535 int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
536                            struct osd_thandle *oh)
537 {
538         struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf;
539         struct osd_device *osd = osd_obj2dev(obj);
540         char *dxattr;
541         size_t size;
542         int rc;
543
544         obj->oo_late_xattr = 0;
545
546         /* Update the SA for additions, modifications, and removals. */
547         rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
548         if (rc)
549                 return rc;
550
551         lu_buf_check_and_alloc(lb, size);
552         if (lb->lb_buf == NULL) {
553                 rc = -ENOMEM;
554                 CERROR("%s: can't allocate buffer for xattr update: rc = %d\n",
555                        osd->od_svname, rc);
556                 return rc;
557         }
558
559         dxattr = lb->lb_buf;
560         rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size,
561                         NV_ENCODE_XDR, KM_SLEEP);
562         if (rc)
563                 return rc;
564         LASSERT(dxattr == lb->lb_buf);
565
566         sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx);
567
568         return 0;
569 }
570
571 /*
572  * Set an extended attribute.
573  * This transaction must have called udmu_xattr_declare_set() first.
574  *
575  * Returns 0 on success or a negative error number on failure.
576  *
577  * No locking is done here.
578  */
579 int __osd_sa_xattr_schedule_update(const struct lu_env *env,
580                                    struct osd_object *obj,
581                                    struct osd_thandle *oh)
582 {
583         ENTRY;
584         LASSERT(obj->oo_sa_hdl);
585         LASSERT(obj->oo_sa_xattr);
586
587         /* schedule batched SA update in osd_object_sa_dirty_rele() */
588         obj->oo_late_xattr = 1;
589         osd_object_sa_dirty_add(obj, oh);
590
591         RETURN(0);
592
593 }
594
595 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
596                        const struct lu_buf *buf, const char *name, int fl,
597                        struct osd_thandle *oh)
598 {
599         uchar_t *nv_value;
600         size_t size;
601         int nv_size;
602         int rc;
603         int too_big = 0;
604
605         rc = __osd_xattr_cache(obj);
606         if (rc)
607                 return rc;
608
609         LASSERT(obj->oo_sa_xattr);
610         if (buf->lb_len > OBD_MAX_EA_SIZE) {
611                 too_big = 1;
612         } else {
613                 /* Prevent the DXATTR SA from consuming the entire SA region */
614                 rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
615                 if (rc)
616                         return rc;
617
618                 if (size + buf->lb_len > DXATTR_MAX_SA_SIZE)
619                         too_big = 1;
620         }
621
622         /* even in case of -EFBIG we must lookup xattr and check can we
623          * rewrite it then delete from SA
624          */
625         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
626                                         &nv_size);
627         if (rc == 0) {
628                 if (fl & LU_XATTR_CREATE) {
629                         return -EEXIST;
630                 } else if (too_big) {
631                         rc = -nvlist_remove(obj->oo_sa_xattr, name,
632                                                 DATA_TYPE_BYTE_ARRAY);
633                         if (rc < 0)
634                                 return rc;
635                         rc = __osd_sa_xattr_schedule_update(env, obj, oh);
636                         return rc == 0 ? -EFBIG : rc;
637                 }
638         } else if (rc == -ENOENT) {
639                 if (fl & LU_XATTR_REPLACE)
640                         return -ENODATA;
641                 else if (too_big)
642                         return -EFBIG;
643         } else {
644                 return rc;
645         }
646
647         /* Ensure xattr doesn't exist in ZAP */
648         if (obj->oo_xattr != ZFS_NO_OBJECT) {
649                 struct osd_device *osd = osd_obj2dev(obj);
650                 uint64_t           objid;
651
652                 rc = -zap_lookup(osd->od_os, obj->oo_xattr,
653                                  name, 8, 1, &objid);
654                 if (rc == 0) {
655                         rc = -dmu_object_free(osd->od_os, objid, oh->ot_tx);
656                         if (rc == 0)
657                                 zap_remove(osd->od_os, obj->oo_xattr,
658                                            name, oh->ot_tx);
659                 }
660         }
661
662         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, name,
663                                     (uchar_t *)buf->lb_buf, buf->lb_len);
664         if (rc)
665                 return rc;
666
667         /* batch updates only for just created dnodes where we
668          * used to set number of EAs in a single transaction
669          */
670         if (obj->oo_dn->dn_allocated_txg == oh->ot_tx->tx_txg)
671                 rc = __osd_sa_xattr_schedule_update(env, obj, oh);
672         else
673                 rc = __osd_sa_xattr_update(env, obj, oh);
674
675         return rc;
676 }
677
678 int
679 __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
680                 const struct lu_buf *buf, const char *name, int fl,
681                 struct osd_thandle *oh)
682 {
683         struct osd_device *osd = osd_obj2dev(obj);
684         dnode_t *xa_zap_dn = NULL;
685         dnode_t *xa_data_dn = NULL;
686         uint64_t xa_data_obj;
687         sa_handle_t *sa_hdl = NULL;
688         dmu_tx_t *tx = oh->ot_tx;
689         uint64_t size;
690         int rc;
691
692         LASSERT(obj->oo_sa_hdl);
693
694         if (obj->oo_xattr == ZFS_NO_OBJECT) {
695                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
696
697                 la->la_valid = LA_MODE;
698                 la->la_mode = S_IFDIR | 0755;
699                 rc = __osd_zap_create(env, osd, &xa_zap_dn, tx, la, 0, 0);
700                 if (rc)
701                         return rc;
702
703                 obj->oo_xattr = xa_zap_dn->dn_object;
704                 rc = osd_object_sa_update(obj, SA_ZPL_XATTR(osd),
705                                 &obj->oo_xattr, 8, oh);
706                 if (rc)
707                         goto out;
708         }
709
710         rc = -zap_lookup(osd->od_os, obj->oo_xattr, name, sizeof(uint64_t), 1,
711                          &xa_data_obj);
712         if (rc == 0) {
713                 if (fl & LU_XATTR_CREATE) {
714                         rc = -EEXIST;
715                         goto out;
716                 }
717                 /* Entry already exists. We'll truncate the existing object. */
718                 rc = __osd_obj2dnode(osd->od_os, xa_data_obj, &xa_data_dn);
719                 if (rc)
720                         goto out;
721
722                 rc = -sa_handle_get(osd->od_os, xa_data_obj, NULL,
723                                         SA_HDL_PRIVATE, &sa_hdl);
724                 if (rc)
725                         goto out;
726
727                 rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(osd), &size, 8);
728                 if (rc)
729                         goto out_sa;
730
731                 rc = -dmu_free_range(osd->od_os, xa_data_dn->dn_object,
732                                      0, DMU_OBJECT_END, tx);
733                 if (rc)
734                         goto out_sa;
735         } else if (rc == -ENOENT) {
736                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
737                 /*
738                  * Entry doesn't exist, we need to create a new one and a new
739                  * object to store the value.
740                  */
741                 if (fl & LU_XATTR_REPLACE) {
742                         /* should be ENOATTR according to the
743                          * man, but that is undefined here
744                          */
745                         rc = -ENODATA;
746                         goto out;
747                 }
748
749                 la->la_valid = LA_MODE;
750                 la->la_mode = S_IFREG | 0644;
751                 rc = __osd_object_create(env, osd, obj,
752                                          lu_object_fid(&obj->oo_dt.do_lu),
753                                          &xa_data_dn, tx, la);
754                 if (rc)
755                         goto out;
756                 xa_data_obj = xa_data_dn->dn_object;
757
758                 rc = -sa_handle_get(osd->od_os, xa_data_obj, NULL,
759                                         SA_HDL_PRIVATE, &sa_hdl);
760                 if (rc)
761                         goto out;
762
763                 rc = -zap_add(osd->od_os, obj->oo_xattr, name, sizeof(uint64_t),
764                                 1, &xa_data_obj, tx);
765                 if (rc)
766                         goto out_sa;
767         } else {
768                 /* There was an error looking up the xattr name */
769                 goto out;
770         }
771
772         /* Finally write the xattr value */
773         dmu_write(osd->od_os, xa_data_obj, 0, buf->lb_len, buf->lb_buf, tx);
774
775         size = buf->lb_len;
776         rc = -sa_update(sa_hdl, SA_ZPL_SIZE(osd), &size, 8, tx);
777
778 out_sa:
779         sa_handle_destroy(sa_hdl);
780 out:
781         if (xa_data_dn != NULL)
782                 osd_dnode_rele(xa_data_dn);
783         if (xa_zap_dn != NULL)
784                 osd_dnode_rele(xa_zap_dn);
785
786         return rc;
787 }
788
789 static int osd_xattr_split_pfid(const struct lu_env *env,
790                                 struct osd_object *obj, struct osd_thandle *oh)
791 {
792         struct osd_thread_info *info = osd_oti_get(env);
793         struct lustre_ost_attrs *loa =
794                 (struct lustre_ost_attrs *)&info->oti_buf;
795         struct lustre_mdt_attrs *lma = &loa->loa_lma;
796         struct lu_buf buf = {
797                 .lb_buf = loa,
798                 .lb_len = sizeof(info->oti_buf),
799         };
800         int size;
801         int rc;
802
803         ENTRY;
804         BUILD_BUG_ON(sizeof(info->oti_buf) < sizeof(*loa));
805         rc = osd_xattr_get_internal(env, obj, &buf, XATTR_NAME_LMA, &size);
806         if (rc)
807                 RETURN(rc);
808
809         lustre_loa_swab(loa, true);
810         LASSERT(lma->lma_compat & LMAC_STRIPE_INFO);
811
812         lma->lma_compat &= ~(LMAC_STRIPE_INFO | LMAC_COMP_INFO);
813         lustre_lma_swab(lma);
814         buf.lb_buf = lma;
815         buf.lb_len = sizeof(*lma);
816         rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
817                                     LU_XATTR_REPLACE, oh);
818         if (!rc)
819                 obj->oo_pfid_in_lma = 0;
820
821         RETURN(rc);
822 }
823
824 /*
825  * In DNE environment, the object (in spite of regular file or directory)
826  * and its name entry may reside on different MDTs. Under such case, we will
827  * create an agent entry on the MDT where the object resides. The agent entry
828  * references the object locally, that makes the object to be visible to the
829  * userspace when mounted as 'zfs' directly. Then the userspace tools, such
830  * as 'tar' can handle the object properly.
831  *
832  * We handle the agent entry during set linkEA that is the common interface
833  * for both regular file and directroy, can handle kinds of cases, such as
834  * create/link/unlink/rename, and so on.
835  *
836  * NOTE: we need to do that for both directory and regular file, so we can NOT
837  *       do that when ea_{insert,delete} that are directory based operations.
838  */
839 static int osd_xattr_handle_linkea(const struct lu_env *env,
840                                    struct osd_device *osd,
841                                    struct osd_object *obj,
842                                    const struct lu_buf *buf,
843                                    struct osd_thandle *oh)
844 {
845         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
846         struct lu_fid *tfid = &osd_oti_get(env)->oti_fid;
847         struct linkea_data ldata = { .ld_buf = (struct lu_buf *)buf };
848         struct lu_name tmpname;
849         int rc;
850         bool remote = false;
851
852         ENTRY;
853         rc = linkea_init_with_rec(&ldata);
854         if (!rc) {
855                 linkea_first_entry(&ldata);
856                 while (ldata.ld_lee != NULL && !remote) {
857                         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
858                                             &tmpname, tfid);
859                         if (osd_remote_fid(env, osd, tfid) > 0)
860                                 remote = true;
861                         else
862                                 linkea_next_entry(&ldata);
863                 }
864         } else if (rc == -ENODATA) {
865                 rc = 0;
866         } else {
867                 RETURN(rc);
868         }
869
870         if (lu_object_has_agent_entry(&obj->oo_dt.do_lu) && !remote) {
871                 rc = osd_delete_from_remote_parent(env, osd, obj, oh, false);
872                 if (rc)
873                         CERROR("%s: failed to remove agent entry for "DFID": rc = %d\n",
874                                osd_name(osd), PFID(fid), rc);
875         } else if (!lu_object_has_agent_entry(&obj->oo_dt.do_lu) && remote) {
876                 rc = osd_add_to_remote_parent(env, osd, obj, oh);
877                 if (rc)
878                         CWARN("%s: failed to create agent entry for "DFID": rc = %d\n",
879                               osd_name(osd), PFID(fid), rc);
880         }
881
882         RETURN(rc);
883 }
884
885 int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
886                   const struct lu_buf *buf, const char *name, int fl,
887                   struct thandle *handle)
888 {
889         struct osd_object *obj = osd_dt_obj(dt);
890         struct osd_device *osd = osd_obj2dev(obj);
891         struct osd_thandle *oh;
892         int rc = 0;
893
894         ENTRY;
895         LASSERT(handle != NULL);
896         LASSERT(osd_invariant(obj));
897
898         if (!osd_obj2dev(obj)->od_posix_acl &&
899             (strcmp(name, XATTR_NAME_POSIX_ACL_ACCESS) == 0 ||
900              strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
901                 RETURN(-EOPNOTSUPP);
902
903         oh = container_of(handle, struct osd_thandle, ot_super);
904
905         down_write(&obj->oo_guard);
906         CDEBUG(D_INODE, "Setting xattr %s with size %d\n",
907                 name, (int)buf->lb_len);
908         /* For the OST migrated from ldiskfs, the PFID EA may
909          * be stored in LMA because of ldiskfs inode size.
910          */
911         if (unlikely(strcmp(name, XATTR_NAME_FID) == 0 &&
912                      obj->oo_pfid_in_lma)) {
913                 rc = osd_xattr_split_pfid(env, obj, oh);
914                 if (!rc)
915                         fl = LU_XATTR_CREATE;
916         } else if (strcmp(name, XATTR_NAME_LINK) == 0 &&
917                    osd->od_remote_parent_dir != ZFS_NO_OBJECT) {
918                 rc = osd_xattr_handle_linkea(env, osd, obj, buf, oh);
919         }
920
921         if (!rc)
922                 rc = osd_xattr_set_internal(env, obj, buf, name, fl, oh);
923         up_write(&obj->oo_guard);
924
925         RETURN(rc);
926 }
927
928 static void
929 __osd_xattr_declare_del(const struct lu_env *env, struct osd_object *obj,
930                         const char *name, struct osd_thandle *oh)
931 {
932         struct osd_device *osd = osd_obj2dev(obj);
933         dmu_tx_t *tx = oh->ot_tx;
934         uint64_t xa_data_obj;
935         int rc;
936
937         /* update SA_ZPL_DXATTR if xattr was in SA */
938         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 0);
939
940         if (obj->oo_xattr == ZFS_NO_OBJECT)
941                 return;
942
943         rc = -zap_lookup(osd->od_os, obj->oo_xattr, name, 8, 1, &xa_data_obj);
944         if (rc == 0) {
945                 /* Entry exists. Will delete the existing obj and ZAP entry */
946                 dmu_tx_hold_bonus(tx, xa_data_obj);
947                 dmu_tx_hold_free(tx, xa_data_obj, 0, DMU_OBJECT_END);
948                 dmu_tx_hold_zap(tx, obj->oo_xattr, FALSE, (char *) name);
949                 return;
950         } else if (rc == -ENOENT) {
951                 /*
952                  * Entry doesn't exist, nothing to be changed.
953                  */
954                 return;
955         }
956
957         /* An error happened */
958         tx->tx_err = -rc;
959 }
960
961 int osd_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
962                           const char *name, struct thandle *handle)
963 {
964         struct osd_object *obj = osd_dt_obj(dt);
965         struct osd_thandle *oh;
966
967         ENTRY;
968         LASSERT(handle != NULL);
969         LASSERT(osd_invariant(obj));
970
971         oh = container_of(handle, struct osd_thandle, ot_super);
972         LASSERT(oh->ot_tx != NULL);
973
974         down_read(&obj->oo_guard);
975         if (likely(dt_object_exists(&obj->oo_dt) && !obj->oo_destroyed)) {
976                 LASSERT(obj->oo_dn != NULL);
977                 __osd_xattr_declare_del(env, obj, name, oh);
978         }
979         up_read(&obj->oo_guard);
980
981         RETURN(0);
982 }
983
984 static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
985                               const char *name, struct osd_thandle *oh)
986 {
987         int rc;
988
989         rc = __osd_xattr_cache(obj);
990         if (rc)
991                 return rc;
992
993         rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
994         if (rc)
995                 return rc;
996
997         /*
998          * only migrate delete LMV, and it needs to be done immediately, because
999          * it's used in deleting sub stripes, and if this is delayed, later when
1000          * destroying the master object, it will delete sub stripes again.
1001          */
1002         if (!strcmp(name, XATTR_NAME_LMV))
1003                 rc = __osd_sa_xattr_update(env, obj, oh);
1004         else
1005                 rc = __osd_sa_xattr_schedule_update(env, obj, oh);
1006         return rc;
1007 }
1008
1009 static int __osd_xattr_del(const struct lu_env *env, struct osd_object *obj,
1010                            const char *name, struct osd_thandle *oh)
1011 {
1012         struct osd_device *osd = osd_obj2dev(obj);
1013         uint64_t xa_data_obj;
1014         int rc;
1015
1016         if (unlikely(!dt_object_exists(&obj->oo_dt) || obj->oo_destroyed))
1017                 return -ENOENT;
1018
1019         /* try remove xattr from SA at first */
1020         rc = __osd_sa_xattr_del(env, obj, name, oh);
1021         if (rc != -ENOENT)
1022                 return rc;
1023
1024         if (obj->oo_xattr == ZFS_NO_OBJECT)
1025                 return 0;
1026
1027         rc = -zap_lookup(osd->od_os, obj->oo_xattr, name, sizeof(uint64_t), 1,
1028                         &xa_data_obj);
1029         if (rc == -ENOENT) {
1030                 rc = 0;
1031         } else if (rc == 0) {
1032                 /* Entry exists. We'll delete the existing obj and ZAP entry */
1033                 rc = -dmu_object_free(osd->od_os, xa_data_obj, oh->ot_tx);
1034                 if (rc)
1035                         return rc;
1036
1037                 rc = -zap_remove(osd->od_os, obj->oo_xattr, name, oh->ot_tx);
1038         }
1039
1040         return rc;
1041 }
1042
1043 int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
1044                   const char *name, struct thandle *handle)
1045 {
1046         struct osd_object  *obj = osd_dt_obj(dt);
1047         struct osd_thandle *oh;
1048         int rc;
1049
1050         ENTRY;
1051         LASSERT(handle != NULL);
1052         oh = container_of(handle, struct osd_thandle, ot_super);
1053         LASSERT(oh->ot_tx != NULL);
1054
1055         if (!osd_obj2dev(obj)->od_posix_acl &&
1056             (strcmp(name, XATTR_NAME_POSIX_ACL_ACCESS) == 0 ||
1057              strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
1058                 RETURN(-EOPNOTSUPP);
1059
1060         down_write(&obj->oo_guard);
1061         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1062                 GOTO(out, rc = -ENOENT);
1063         LASSERT(obj->oo_dn != NULL);
1064         /* For the OST migrated from ldiskfs, the PFID EA may
1065          * be stored in LMA because of ldiskfs inode size.
1066          */
1067         if (unlikely(strcmp(name, XATTR_NAME_FID) == 0 && obj->oo_pfid_in_lma))
1068                 rc = osd_xattr_split_pfid(env, obj, oh);
1069         else
1070                 rc = __osd_xattr_del(env, obj, name, oh);
1071
1072 out:
1073         up_write(&obj->oo_guard);
1074         RETURN(rc);
1075 }
1076
1077 void osd_declare_xattrs_destroy(const struct lu_env *env,
1078                                 struct osd_object *obj, struct osd_thandle *oh)
1079 {
1080         struct osd_device *osd = osd_obj2dev(obj);
1081         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1082         uint64_t oid = obj->oo_xattr, xid;
1083         dmu_tx_t *tx = oh->ot_tx;
1084         zap_cursor_t *zc;
1085         int rc;
1086
1087         if (oid == ZFS_NO_OBJECT)
1088                 return; /* Nothing to do for SA xattrs */
1089
1090         /* Declare to free the ZAP holding xattrs */
1091         dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
1092
1093         rc = osd_zap_cursor_init(&zc, osd->od_os, oid, 0);
1094         if (rc)
1095                 goto out;
1096
1097         while (zap_cursor_retrieve(zc, za) == 0) {
1098                 LASSERT(za->za_num_integers == 1);
1099                 LASSERT(za->za_integer_length == sizeof(uint64_t));
1100
1101                 rc = -zap_lookup(osd->od_os, oid, za->za_name,
1102                                  sizeof(uint64_t), 1, &xid);
1103                 if (rc) {
1104                         CERROR("%s: xattr %s lookup failed: rc = %d\n",
1105                                osd->od_svname, za->za_name, rc);
1106                         break;
1107                 }
1108                 dmu_tx_hold_free(tx, xid, 0, DMU_OBJECT_END);
1109
1110                 zap_cursor_advance(zc);
1111         }
1112
1113         osd_zap_cursor_fini(zc);
1114 out:
1115         if (rc && tx->tx_err == 0)
1116                 tx->tx_err = -rc;
1117 }
1118
1119 int osd_xattrs_destroy(const struct lu_env *env,
1120                        struct osd_object *obj, struct osd_thandle *oh)
1121 {
1122         struct osd_device *osd = osd_obj2dev(obj);
1123         dmu_tx_t *tx = oh->ot_tx;
1124         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1125         zap_cursor_t *zc;
1126         uint64_t xid;
1127         int rc;
1128
1129         /* The transaction must have been assigned to a transaction group. */
1130         LASSERT(tx->tx_txg != 0);
1131
1132         if (obj->oo_xattr == ZFS_NO_OBJECT)
1133                 return 0; /* Nothing to do for SA xattrs */
1134
1135         /* Free the ZAP holding the xattrs */
1136         rc = osd_zap_cursor_init(&zc, osd->od_os, obj->oo_xattr, 0);
1137         if (rc)
1138                 return rc;
1139
1140         while (zap_cursor_retrieve(zc, za) == 0) {
1141                 LASSERT(za->za_num_integers == 1);
1142                 LASSERT(za->za_integer_length == sizeof(uint64_t));
1143
1144                 rc = -zap_lookup(osd->od_os, obj->oo_xattr, za->za_name,
1145                                  sizeof(uint64_t), 1, &xid);
1146                 if (rc) {
1147                         CERROR("%s: lookup xattr %s failed: rc = %d\n",
1148                                osd->od_svname, za->za_name, rc);
1149                 } else {
1150                         rc = -dmu_object_free(osd->od_os, xid, tx);
1151                         if (rc)
1152                                 CERROR("%s: free xattr %s failed: rc = %d\n",
1153                                        osd->od_svname, za->za_name, rc);
1154                 }
1155                 zap_cursor_advance(zc);
1156         }
1157         osd_zap_cursor_fini(zc);
1158
1159         rc = -dmu_object_free(osd->od_os, obj->oo_xattr, tx);
1160         if (rc)
1161                 CERROR("%s: free xattr %llu failed: rc = %d\n",
1162                        osd->od_svname, obj->oo_xattr, rc);
1163
1164         return rc;
1165 }
1166
1167 static int
1168 osd_sa_xattr_list(const struct lu_env *env, struct osd_object *obj,
1169                   const struct lu_buf *lb)
1170 {
1171         nvpair_t *nvp = NULL;
1172         int len, counted = 0;
1173         int rc = 0;
1174
1175         rc = __osd_xattr_cache(obj);
1176         if (rc)
1177                 return rc;
1178
1179         while ((nvp = nvlist_next_nvpair(obj->oo_sa_xattr, nvp)) != NULL) {
1180                 const char *name = nvpair_name(nvp);
1181
1182                 if (!osd_obj2dev(obj)->od_posix_acl &&
1183                     (strcmp(name, XATTR_NAME_POSIX_ACL_ACCESS) == 0 ||
1184                      strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
1185                         continue;
1186
1187                 len = strlen(name);
1188                 if (lb->lb_buf != NULL) {
1189                         if (counted + len + 1 > lb->lb_len)
1190                                 return -ERANGE;
1191
1192                         memcpy(lb->lb_buf + counted, name, len + 1);
1193                 }
1194                 counted += len + 1;
1195         }
1196         return counted;
1197 }
1198
1199 int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
1200                    const struct lu_buf *lb)
1201 {
1202         struct osd_object *obj = osd_dt_obj(dt);
1203         struct osd_device *osd = osd_obj2dev(obj);
1204         zap_attribute_t *za = &osd_oti_get(env)->oti_za;
1205         zap_cursor_t *zc;
1206         int rc, counted;
1207
1208         ENTRY;
1209         down_read(&obj->oo_guard);
1210         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1211                 GOTO(out, rc = -ENOENT);
1212         LASSERT(obj->oo_dn != NULL);
1213
1214         rc = osd_sa_xattr_list(env, obj, lb);
1215         if (rc < 0)
1216                 GOTO(out, rc);
1217
1218         counted = rc;
1219
1220         /* continue with dnode xattr if any */
1221         if (obj->oo_xattr == ZFS_NO_OBJECT)
1222                 GOTO(out, rc = counted);
1223
1224         rc = osd_zap_cursor_init(&zc, osd->od_os, obj->oo_xattr, 0);
1225         if (rc)
1226                 GOTO(out, rc);
1227
1228         while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
1229                 if (!osd_obj2dev(obj)->od_posix_acl &&
1230                     (strcmp(za->za_name, XATTR_NAME_POSIX_ACL_ACCESS) == 0 ||
1231                      strcmp(za->za_name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0)) {
1232                         zap_cursor_advance(zc);
1233                         continue;
1234                 }
1235
1236                 rc = strlen(za->za_name);
1237                 if (lb->lb_buf != NULL) {
1238                         if (counted + rc + 1 > lb->lb_len)
1239                                 GOTO(out_fini, rc = -ERANGE);
1240
1241                         memcpy(lb->lb_buf + counted, za->za_name, rc + 1);
1242                 }
1243                 counted += rc + 1;
1244
1245                 zap_cursor_advance(zc);
1246         }
1247         if (rc == -ENOENT) /* no more kes in the index */
1248                 rc = 0;
1249         else if (unlikely(rc < 0))
1250                 GOTO(out_fini, rc);
1251         rc = counted;
1252
1253 out_fini:
1254         osd_zap_cursor_fini(zc);
1255 out:
1256         up_read(&obj->oo_guard);
1257         RETURN(rc);
1258
1259 }