Whamcloud - gitweb
LU-1330 obdclass: add obd_target.h
[fs/lustre-release.git] / lustre / osd-zfs / osd_xattr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2012, 2013, Intel Corporation.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_xattr.c
39  * functions to manipulate extended attributes and system attributes
40  *
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mike Pershin <tappro@whamcloud.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_OSD
49
50 #include <lustre_ver.h>
51 #include <libcfs/libcfs.h>
52 #include <obd_support.h>
53 #include <lustre_net.h>
54 #include <obd.h>
55 #include <obd_class.h>
56 #include <lustre_disk.h>
57 #include <lustre_fid.h>
58
59 #include "osd_internal.h"
60
61 #include <sys/dnode.h>
62 #include <sys/dbuf.h>
63 #include <sys/spa.h>
64 #include <sys/stat.h>
65 #include <sys/zap.h>
66 #include <sys/spa_impl.h>
67 #include <sys/zfs_znode.h>
68 #include <sys/dmu_tx.h>
69 #include <sys/dmu_objset.h>
70 #include <sys/dsl_prop.h>
71 #include <sys/sa_impl.h>
72 #include <sys/txg.h>
73
74
75 /*
76  * Copy an extended attribute into the buffer provided, or compute the
77  * required buffer size.
78  *
79  * If buf is NULL, it computes the required buffer size.
80  *
81  * Returns 0 on success or a negative error number on failure.
82  * On success, the number of bytes used / required is stored in 'size'.
83  *
84  * No locking is done here.
85  */
86 int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr)
87 {
88         sa_handle_t *sa_hdl;
89         char        *buf;
90         int          rc, size;
91
92         if (unlikely(dnode == ZFS_NO_OBJECT))
93                 return -ENOENT;
94
95         rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
96         if (rc)
97                 return rc;
98
99         rc = -sa_size(sa_hdl, SA_ZPL_DXATTR(uos), &size);
100         if (rc) {
101                 if (rc == -ENOENT)
102                         rc = -nvlist_alloc(sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
103                 goto out_sa;
104         }
105
106         buf = sa_spill_alloc(KM_SLEEP);
107         if (buf == NULL) {
108                 rc = -ENOMEM;
109                 goto out_sa;
110         }
111         rc = -sa_lookup(sa_hdl, SA_ZPL_DXATTR(uos), buf, size);
112         if (rc == 0)
113                 rc = -nvlist_unpack(buf, size, sa_xattr, KM_SLEEP);
114         sa_spill_free(buf);
115 out_sa:
116         sa_handle_destroy(sa_hdl);
117
118         return rc;
119 }
120
121 static inline int __osd_xattr_cache(const struct lu_env *env,
122                                     struct osd_object *obj)
123 {
124         LASSERT(obj->oo_sa_xattr == NULL);
125         LASSERT(obj->oo_db != NULL);
126
127         return __osd_xattr_load(&osd_obj2dev(obj)->od_objset,
128                                 obj->oo_db->db_object, &obj->oo_sa_xattr);
129 }
130
131 int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj,
132                 const struct lu_buf *buf, const char *name, int *sizep)
133 {
134         uchar_t *nv_value;
135         int      rc;
136
137         LASSERT(obj->oo_sa_hdl);
138
139         if (obj->oo_sa_xattr == NULL) {
140                 rc = __osd_xattr_cache(env, obj);
141                 if (rc)
142                         return rc;
143         }
144
145         LASSERT(obj->oo_sa_xattr);
146         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
147                         sizep);
148         if (rc)
149                 return rc;
150
151         if (buf == NULL || buf->lb_buf == NULL) {
152                 /* return the required size by *sizep */
153                 return 0;
154         }
155
156         if (*sizep > buf->lb_len)
157                 return -ERANGE; /* match ldiskfs error */
158
159         memcpy(buf->lb_buf, nv_value, *sizep);
160         return 0;
161 }
162
163 int __osd_xattr_get_large(const struct lu_env *env, udmu_objset_t *uos,
164                           uint64_t xattr, struct lu_buf *buf,
165                           const char *name, int *sizep)
166 {
167         dmu_buf_t       *xa_data_db;
168         sa_handle_t     *sa_hdl = NULL;
169         uint64_t         xa_data_obj, size;
170         int              rc;
171
172         /* are there any extended attributes? */
173         if (xattr == ZFS_NO_OBJECT)
174                 return -ENOENT;
175
176         /* Lookup the object number containing the xattr data */
177         rc = -zap_lookup(uos->os, xattr, name, sizeof(uint64_t), 1,
178                         &xa_data_obj);
179         if (rc)
180                 return rc;
181
182         rc = __osd_obj2dbuf(env, uos->os, xa_data_obj, &xa_data_db, FTAG);
183         if (rc)
184                 return rc;
185
186         rc = -sa_handle_get(uos->os, xa_data_obj, NULL, SA_HDL_PRIVATE,
187                         &sa_hdl);
188         if (rc)
189                 goto out_rele;
190
191         /* Get the xattr value length / object size */
192         rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(uos), &size, 8);
193         if (rc)
194                 goto out;
195
196         if (size > INT_MAX) {
197                 rc = -EOVERFLOW;
198                 goto out;
199         }
200
201         *sizep = (int)size;
202
203         if (buf == NULL || buf->lb_buf == NULL) {
204                 /* We only need to return the required size */
205                 goto out;
206         }
207         if (*sizep > buf->lb_len) {
208                 rc = -ERANGE; /* match ldiskfs error */
209                 goto out;
210         }
211
212         rc = -dmu_read(uos->os, xa_data_db->db_object, 0,
213                         size, buf->lb_buf, DMU_READ_PREFETCH);
214
215 out:
216         sa_handle_destroy(sa_hdl);
217 out_rele:
218         dmu_buf_rele(xa_data_db, FTAG);
219
220         return rc;
221 }
222
223 int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj,
224                 struct lu_buf *buf, const char *name, int *sizep)
225 {
226         int rc;
227
228         /* check SA_ZPL_DXATTR first then fallback to directory xattr */
229         rc = __osd_sa_xattr_get(env, obj, buf, name, sizep);
230         if (rc != -ENOENT)
231                 return rc;
232
233         rc = __osd_xattr_get_large(env, &osd_obj2dev(obj)->od_objset,
234                                    obj->oo_xattr, buf, name, sizep);
235
236         return rc;
237 }
238
239 int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
240                 struct lu_buf *buf, const char *name,
241                 struct lustre_capa *capa)
242 {
243         struct osd_object  *obj  = osd_dt_obj(dt);
244         int                 rc, size = 0;
245         ENTRY;
246
247         LASSERT(obj->oo_db != NULL);
248         LASSERT(osd_invariant(obj));
249         LASSERT(dt_object_exists(dt));
250
251         down(&obj->oo_guard);
252         rc = __osd_xattr_get(env, obj, buf, name, &size);
253         up(&obj->oo_guard);
254
255         if (rc == -ENOENT)
256                 rc = -ENODATA;
257         else if (rc == 0)
258                 rc = size;
259         RETURN(rc);
260 }
261
262 void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
263                         int vallen, const char *name, struct osd_thandle *oh)
264 {
265         struct osd_device *osd = osd_obj2dev(obj);
266         udmu_objset_t     *uos = &osd->od_objset;
267         dmu_buf_t         *db = obj->oo_db;
268         dmu_tx_t          *tx = oh->ot_tx;
269         uint64_t           xa_data_obj;
270         int                rc = 0;
271         int                here;
272
273         here = dt_object_exists(&obj->oo_dt);
274
275         /* object may be not yet created */
276         if (here) {
277                 LASSERT(db);
278                 LASSERT(obj->oo_sa_hdl);
279                 /* we might just update SA_ZPL_DXATTR */
280                 dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
281
282                 if (obj->oo_xattr == ZFS_NO_OBJECT)
283                         rc = -ENOENT;
284         }
285
286         if (!here || rc == -ENOENT) {
287                 /* we'll be updating SA_ZPL_XATTR */
288                 if (here) {
289                         LASSERT(obj->oo_sa_hdl);
290                         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
291                 }
292                 /* xattr zap + entry */
293                 dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name);
294                 /* xattr value obj */
295                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
296                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
297                 return;
298         }
299
300         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
301                         &xa_data_obj);
302         if (rc == 0) {
303                 /*
304                  * Entry already exists.
305                  * We'll truncate the existing object.
306                  */
307                 dmu_tx_hold_bonus(tx, xa_data_obj);
308                 dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END);
309                 dmu_tx_hold_write(tx, xa_data_obj, 0, vallen);
310                 return;
311         } else if (rc == -ENOENT) {
312                 /*
313                  * Entry doesn't exist, we need to create a new one and a new
314                  * object to store the value.
315                  */
316                 dmu_tx_hold_bonus(tx, obj->oo_xattr);
317                 dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name);
318                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
319                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
320                 return;
321         }
322
323         /* An error happened */
324         tx->tx_err = -rc;
325 }
326
327 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
328                 const struct lu_buf *buf, const char *name,
329                 int fl, struct thandle *handle)
330 {
331         struct osd_object  *obj = osd_dt_obj(dt);
332         struct osd_thandle *oh;
333         ENTRY;
334
335         LASSERT(handle != NULL);
336         oh = container_of0(handle, struct osd_thandle, ot_super);
337
338         down(&obj->oo_guard);
339         __osd_xattr_declare_set(env, obj, buf->lb_len, name, oh);
340         up(&obj->oo_guard);
341
342         RETURN(0);
343 }
344
345 /*
346  * Set an extended attribute.
347  * This transaction must have called udmu_xattr_declare_set() first.
348  *
349  * Returns 0 on success or a negative error number on failure.
350  *
351  * No locking is done here.
352  */
353 static int
354 __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
355                         struct osd_thandle *oh)
356 {
357         struct osd_device *osd = osd_obj2dev(obj);
358         udmu_objset_t     *uos = &osd->od_objset;
359         char              *dxattr;
360         size_t             sa_size;
361         int                rc;
362
363         ENTRY;
364         LASSERT(obj->oo_sa_hdl);
365         LASSERT(obj->oo_sa_xattr);
366
367         /* Update the SA for additions, modifications, and removals. */
368         rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
369         if (rc)
370                 return rc;
371
372         dxattr = sa_spill_alloc(KM_SLEEP);
373         if (dxattr == NULL)
374                 RETURN(-ENOMEM);
375
376         rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
377                                 NV_ENCODE_XDR, KM_SLEEP);
378         if (rc)
379                 GOTO(out_free, rc);
380
381         rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(uos), dxattr, sa_size, oh);
382 out_free:
383         sa_spill_free(dxattr);
384         RETURN(rc);
385 }
386
387 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
388                         const struct lu_buf *buf, const char *name, int fl,
389                         struct osd_thandle *oh)
390 {
391         uchar_t *nv_value;
392         size_t  size;
393         int     nv_size;
394         int     rc;
395         int     too_big = 0;
396
397         LASSERT(obj->oo_sa_hdl);
398         if (obj->oo_sa_xattr == NULL) {
399                 rc = __osd_xattr_cache(env, obj);
400                 if (rc)
401                         return rc;
402         }
403
404         LASSERT(obj->oo_sa_xattr);
405         /* Limited to 32k to keep nvpair memory allocations small */
406         if (buf->lb_len > DXATTR_MAX_ENTRY_SIZE) {
407                 too_big = 1;
408         } else {
409                 /* Prevent the DXATTR SA from consuming the entire SA
410                  * region */
411                 rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
412                 if (rc)
413                         return rc;
414
415                 if (size + buf->lb_len > DXATTR_MAX_SA_SIZE)
416                         too_big = 1;
417         }
418
419         /* even in case of -EFBIG we must lookup xattr and check can we
420          * rewrite it then delete from SA */
421         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
422                                         &nv_size);
423         if (rc == 0) {
424                 if (fl & LU_XATTR_CREATE) {
425                         return -EEXIST;
426                 } else if (too_big) {
427                         rc = -nvlist_remove(obj->oo_sa_xattr, name,
428                                                 DATA_TYPE_BYTE_ARRAY);
429                         if (rc < 0)
430                                 return rc;
431                         rc = __osd_sa_xattr_update(env, obj, oh);
432                         return rc == 0 ? -EFBIG : rc;
433                 }
434         } else if (rc == -ENOENT) {
435                 if (fl & LU_XATTR_REPLACE)
436                         return -ENODATA;
437                 else if (too_big)
438                         return -EFBIG;
439         } else {
440                 return rc;
441         }
442
443         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, name,
444                                     (uchar_t *)buf->lb_buf, buf->lb_len);
445         if (rc)
446                 return rc;
447
448         rc = __osd_sa_xattr_update(env, obj, oh);
449         return rc;
450 }
451
452 int
453 __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
454                 const struct lu_buf *buf, const char *name, int fl,
455                 struct osd_thandle *oh)
456 {
457         struct osd_device *osd = osd_obj2dev(obj);
458         udmu_objset_t     *uos = &osd->od_objset;
459         dmu_buf_t         *xa_zap_db = NULL;
460         dmu_buf_t         *xa_data_db = NULL;
461         uint64_t           xa_data_obj;
462         sa_handle_t       *sa_hdl = NULL;
463         dmu_tx_t          *tx = oh->ot_tx;
464         uint64_t           size;
465         int                rc;
466
467         LASSERT(obj->oo_sa_hdl);
468
469         if (obj->oo_xattr == ZFS_NO_OBJECT) {
470                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
471
472                 la->la_valid = LA_MODE;
473                 la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
474                 rc = __osd_zap_create(env, uos, &xa_zap_db, tx, la,
475                                       obj->oo_db->db_object, FTAG, 0);
476                 if (rc)
477                         return rc;
478
479                 obj->oo_xattr = xa_zap_db->db_object;
480                 rc = osd_object_sa_update(obj, SA_ZPL_XATTR(uos),
481                                 &obj->oo_xattr, 8, oh);
482                 if (rc)
483                         goto out;
484         }
485
486         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
487                         &xa_data_obj);
488         if (rc == 0) {
489                 if (fl & LU_XATTR_CREATE) {
490                         rc = -EEXIST;
491                         goto out;
492                 }
493                 /*
494                  * Entry already exists.
495                  * We'll truncate the existing object.
496                  */
497                 rc = __osd_obj2dbuf(env, uos->os, xa_data_obj,
498                                         &xa_data_db, FTAG);
499                 if (rc)
500                         goto out;
501
502                 rc = -sa_handle_get(uos->os, xa_data_obj, NULL,
503                                         SA_HDL_PRIVATE, &sa_hdl);
504                 if (rc)
505                         goto out;
506
507                 rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(uos), &size, 8);
508                 if (rc)
509                         goto out_sa;
510
511                 rc = -dmu_free_range(uos->os, xa_data_db->db_object,
512                                         0, DMU_OBJECT_END, tx);
513                 if (rc)
514                         goto out_sa;
515         } else if (rc == -ENOENT) {
516                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
517                 /*
518                  * Entry doesn't exist, we need to create a new one and a new
519                  * object to store the value.
520                  */
521                 if (fl & LU_XATTR_REPLACE) {
522                         /* should be ENOATTR according to the
523                          * man, but that is undefined here */
524                         rc = -ENODATA;
525                         goto out;
526                 }
527
528                 la->la_valid = LA_MODE;
529                 la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
530                 rc = __osd_object_create(env, uos, &xa_data_db, tx, la,
531                                          obj->oo_xattr, FTAG);
532                 if (rc)
533                         goto out;
534                 xa_data_obj = xa_data_db->db_object;
535
536                 rc = -sa_handle_get(uos->os, xa_data_obj, NULL,
537                                         SA_HDL_PRIVATE, &sa_hdl);
538                 if (rc)
539                         goto out;
540
541                 rc = -zap_add(uos->os, obj->oo_xattr, name, sizeof(uint64_t),
542                                 1, &xa_data_obj, tx);
543                 if (rc)
544                         goto out_sa;
545         } else {
546                 /* There was an error looking up the xattr name */
547                 goto out;
548         }
549
550         /* Finally write the xattr value */
551         dmu_write(uos->os, xa_data_obj, 0, buf->lb_len, buf->lb_buf, tx);
552
553         size = buf->lb_len;
554         rc = -sa_update(sa_hdl, SA_ZPL_SIZE(uos), &size, 8, tx);
555
556 out_sa:
557         sa_handle_destroy(sa_hdl);
558 out:
559         if (xa_data_db != NULL)
560                 dmu_buf_rele(xa_data_db, FTAG);
561         if (xa_zap_db != NULL)
562                 dmu_buf_rele(xa_zap_db, FTAG);
563
564         return rc;
565 }
566
567 int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
568                   const struct lu_buf *buf, const char *name, int fl,
569                   struct thandle *handle, struct lustre_capa *capa)
570 {
571         struct osd_object  *obj = osd_dt_obj(dt);
572         struct osd_thandle *oh;
573         int rc = 0;
574         ENTRY;
575
576         LASSERT(handle != NULL);
577         LASSERT(osd_invariant(obj));
578         LASSERT(dt_object_exists(dt));
579         LASSERT(obj->oo_db);
580
581         oh = container_of0(handle, struct osd_thandle, ot_super);
582
583         down(&obj->oo_guard);
584         CDEBUG(D_INODE, "Setting xattr %s with size %d\n",
585                 name, (int)buf->lb_len);
586         rc = osd_xattr_set_internal(env, obj, buf, name, fl, oh, capa);
587         up(&obj->oo_guard);
588
589         RETURN(rc);
590 }
591
592 static void
593 __osd_xattr_declare_del(const struct lu_env *env, struct osd_object *obj,
594                         const char *name, struct osd_thandle *oh)
595 {
596         struct osd_device *osd = osd_obj2dev(obj);
597         udmu_objset_t     *uos = &osd->od_objset;
598         dmu_tx_t          *tx = oh->ot_tx;
599         uint64_t           xa_data_obj;
600         int                rc;
601
602         /* update SA_ZPL_DXATTR if xattr was in SA */
603         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 0);
604
605         if (obj->oo_xattr == ZFS_NO_OBJECT)
606                 return;
607
608         rc = -zap_lookup(uos->os, obj->oo_xattr, name, 8, 1, &xa_data_obj);
609         if (rc == 0) {
610                 /*
611                  * Entry exists.
612                  * We'll delete the existing object and ZAP entry.
613                  */
614                 dmu_tx_hold_bonus(tx, xa_data_obj);
615                 dmu_tx_hold_free(tx, xa_data_obj, 0, DMU_OBJECT_END);
616                 dmu_tx_hold_zap(tx, obj->oo_xattr, FALSE, (char *) name);
617                 return;
618         } else if (rc == -ENOENT) {
619                 /*
620                  * Entry doesn't exist, nothing to be changed.
621                  */
622                 return;
623         }
624
625         /* An error happened */
626         tx->tx_err = -rc;
627 }
628
629 int osd_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
630                         const char *name, struct thandle *handle)
631 {
632         struct osd_object  *obj = osd_dt_obj(dt);
633         struct osd_thandle *oh;
634         ENTRY;
635
636         LASSERT(handle != NULL);
637         LASSERT(dt_object_exists(dt));
638         LASSERT(osd_invariant(obj));
639
640         oh = container_of0(handle, struct osd_thandle, ot_super);
641         LASSERT(oh->ot_tx != NULL);
642         LASSERT(obj->oo_db != NULL);
643
644         down(&obj->oo_guard);
645         __osd_xattr_declare_del(env, obj, name, oh);
646         up(&obj->oo_guard);
647
648         RETURN(0);
649 }
650
651 int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
652                         const char *name, struct osd_thandle *oh)
653 {
654         int rc;
655
656         if (obj->oo_sa_xattr == NULL) {
657                 rc = __osd_xattr_cache(env, obj);
658                 if (rc)
659                         return rc;
660         }
661
662         rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
663         if (rc == 0)
664                 rc = __osd_sa_xattr_update(env, obj, oh);
665         return rc;
666 }
667
668 int __osd_xattr_del(const struct lu_env *env, struct osd_object *obj,
669                         const char *name, struct osd_thandle *oh)
670 {
671         struct osd_device *osd = osd_obj2dev(obj);
672         udmu_objset_t     *uos = &osd->od_objset;
673         uint64_t           xa_data_obj;
674         int                rc;
675
676         /* try remove xattr from SA at first */
677         rc = __osd_sa_xattr_del(env, obj, name, oh);
678         if (rc != -ENOENT)
679                 return rc;
680
681         if (obj->oo_xattr == ZFS_NO_OBJECT)
682                 return 0;
683
684         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
685                         &xa_data_obj);
686         if (rc == -ENOENT) {
687                 rc = 0;
688         } else if (rc == 0) {
689                 /*
690                  * Entry exists.
691                  * We'll delete the existing object and ZAP entry.
692                  */
693                 rc = __osd_object_free(uos, xa_data_obj, oh->ot_tx);
694                 if (rc)
695                         return rc;
696
697                 rc = -zap_remove(uos->os, obj->oo_xattr, name, oh->ot_tx);
698         }
699
700         return rc;
701 }
702
703 int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
704                 const char *name, struct thandle *handle,
705                 struct lustre_capa *capa)
706 {
707         struct osd_object  *obj = osd_dt_obj(dt);
708         struct osd_thandle *oh;
709         int                 rc;
710         ENTRY;
711
712         LASSERT(handle != NULL);
713         LASSERT(obj->oo_db != NULL);
714         LASSERT(osd_invariant(obj));
715         LASSERT(dt_object_exists(dt));
716         oh = container_of0(handle, struct osd_thandle, ot_super);
717         LASSERT(oh->ot_tx != NULL);
718
719         down(&obj->oo_guard);
720         rc = __osd_xattr_del(env, obj, name, oh);
721         up(&obj->oo_guard);
722
723         RETURN(rc);
724 }
725
726 static int
727 osd_sa_xattr_list(const struct lu_env *env, struct osd_object *obj,
728                 struct lu_buf *lb)
729 {
730         nvpair_t *nvp = NULL;
731         int       len, counted = 0, remain = lb->lb_len;
732         int       rc = 0;
733
734         if (obj->oo_sa_xattr == NULL) {
735                 rc = __osd_xattr_cache(env, obj);
736                 if (rc)
737                         return rc;
738         }
739
740         LASSERT(obj->oo_sa_xattr);
741
742         while ((nvp = nvlist_next_nvpair(obj->oo_sa_xattr, nvp)) != NULL) {
743                 len = strlen(nvpair_name(nvp));
744                 if (lb->lb_buf != NULL) {
745                         if (len + 1 > remain)
746                                 return -ERANGE;
747
748                         memcpy(lb->lb_buf, nvpair_name(nvp), len);
749                         lb->lb_buf += len;
750                         *((char *)lb->lb_buf) = '\0';
751                         lb->lb_buf++;
752                         remain -= len + 1;
753                 }
754                 counted += len + 1;
755         }
756         return counted;
757 }
758
759 int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
760                 struct lu_buf *lb, struct lustre_capa *capa)
761 {
762         struct osd_thread_info *oti = osd_oti_get(env);
763         struct osd_object      *obj = osd_dt_obj(dt);
764         struct osd_device      *osd = osd_obj2dev(obj);
765         udmu_objset_t          *uos = &osd->od_objset;
766         zap_cursor_t           *zc;
767         int                    rc, counted = 0, remain = lb->lb_len;
768         ENTRY;
769
770         LASSERT(obj->oo_db != NULL);
771         LASSERT(osd_invariant(obj));
772         LASSERT(dt_object_exists(dt));
773
774         down(&obj->oo_guard);
775
776         rc = osd_sa_xattr_list(env, obj, lb);
777         if (rc < 0)
778                 GOTO(out, rc);
779         counted = rc;
780         remain -= counted;
781
782         /* continue with dnode xattr if any */
783         if (obj->oo_xattr == ZFS_NO_OBJECT)
784                 GOTO(out, rc = counted);
785
786         rc = -udmu_zap_cursor_init(&zc, uos, obj->oo_xattr, 0);
787         if (rc)
788                 GOTO(out, rc);
789
790         while ((rc = -udmu_zap_cursor_retrieve_key(env, zc, oti->oti_key,
791                                                 MAXNAMELEN)) == 0) {
792                 rc = strlen(oti->oti_key);
793                 if (lb->lb_buf != NULL) {
794                         if (rc + 1 > remain)
795                                 RETURN(-ERANGE);
796
797                         memcpy(lb->lb_buf, oti->oti_key, rc);
798                         lb->lb_buf += rc;
799                         *((char *)lb->lb_buf) = '\0';
800                         lb->lb_buf++;
801                         remain -= rc + 1;
802                 }
803                 counted += rc + 1;
804
805                 zap_cursor_advance(zc);
806         }
807         if (rc == -ENOENT) /* no more kes in the index */
808                 rc = 0;
809         else if (unlikely(rc < 0))
810                 GOTO(out_fini, rc);
811         rc = counted;
812
813 out_fini:
814         udmu_zap_cursor_fini(zc);
815 out:
816         up(&obj->oo_guard);
817         RETURN(rc);
818
819 }
820
821