Whamcloud - gitweb
2900fb6a63c505fe0fa0b64f93beb082cec4d3f8
[fs/lustre-release.git] / lustre / osd-zfs / osd_xattr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2012, 2013, Intel Corporation.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_xattr.c
39  * functions to manipulate extended attributes and system attributes
40  *
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mike Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_OSD
46
47 #include <lustre_ver.h>
48 #include <libcfs/libcfs.h>
49 #include <obd_support.h>
50 #include <lustre_net.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <lustre_disk.h>
54 #include <lustre_fid.h>
55
56 #include "osd_internal.h"
57
58 #include <sys/dnode.h>
59 #include <sys/dbuf.h>
60 #include <sys/spa.h>
61 #include <sys/stat.h>
62 #include <sys/zap.h>
63 #include <sys/spa_impl.h>
64 #include <sys/zfs_znode.h>
65 #include <sys/dmu_tx.h>
66 #include <sys/dmu_objset.h>
67 #include <sys/dsl_prop.h>
68 #include <sys/sa_impl.h>
69 #include <sys/txg.h>
70
71
72 /*
73  * Copy an extended attribute into the buffer provided, or compute the
74  * required buffer size.
75  *
76  * If buf is NULL, it computes the required buffer size.
77  *
78  * Returns 0 on success or a negative error number on failure.
79  * On success, the number of bytes used / required is stored in 'size'.
80  *
81  * No locking is done here.
82  */
83 int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr)
84 {
85         sa_handle_t *sa_hdl;
86         char        *buf;
87         int          rc, size;
88
89         if (unlikely(dnode == ZFS_NO_OBJECT))
90                 return -ENOENT;
91
92         rc = -sa_handle_get(uos->os, dnode, NULL, SA_HDL_PRIVATE, &sa_hdl);
93         if (rc)
94                 return rc;
95
96         rc = -sa_size(sa_hdl, SA_ZPL_DXATTR(uos), &size);
97         if (rc) {
98                 if (rc == -ENOENT)
99                         rc = -nvlist_alloc(sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
100                 goto out_sa;
101         }
102
103         buf = sa_spill_alloc(KM_SLEEP);
104         if (buf == NULL) {
105                 rc = -ENOMEM;
106                 goto out_sa;
107         }
108         rc = -sa_lookup(sa_hdl, SA_ZPL_DXATTR(uos), buf, size);
109         if (rc == 0)
110                 rc = -nvlist_unpack(buf, size, sa_xattr, KM_SLEEP);
111         sa_spill_free(buf);
112 out_sa:
113         sa_handle_destroy(sa_hdl);
114
115         return rc;
116 }
117
118 static inline int __osd_xattr_cache(const struct lu_env *env,
119                                     struct osd_object *obj)
120 {
121         LASSERT(obj->oo_sa_xattr == NULL);
122         LASSERT(obj->oo_db != NULL);
123
124         return __osd_xattr_load(&osd_obj2dev(obj)->od_objset,
125                                 obj->oo_db->db_object, &obj->oo_sa_xattr);
126 }
127
128 int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj,
129                 const struct lu_buf *buf, const char *name, int *sizep)
130 {
131         uchar_t *nv_value;
132         int      rc;
133
134         LASSERT(obj->oo_sa_hdl);
135
136         if (obj->oo_sa_xattr == NULL) {
137                 rc = __osd_xattr_cache(env, obj);
138                 if (rc)
139                         return rc;
140         }
141
142         LASSERT(obj->oo_sa_xattr);
143         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
144                         sizep);
145         if (rc)
146                 return rc;
147
148         if (buf == NULL || buf->lb_buf == NULL) {
149                 /* return the required size by *sizep */
150                 return 0;
151         }
152
153         if (*sizep > buf->lb_len)
154                 return -ERANGE; /* match ldiskfs error */
155
156         memcpy(buf->lb_buf, nv_value, *sizep);
157         return 0;
158 }
159
160 int __osd_xattr_get_large(const struct lu_env *env, udmu_objset_t *uos,
161                           uint64_t xattr, struct lu_buf *buf,
162                           const char *name, int *sizep)
163 {
164         dmu_buf_t       *xa_data_db;
165         sa_handle_t     *sa_hdl = NULL;
166         uint64_t         xa_data_obj, size;
167         int              rc;
168
169         /* are there any extended attributes? */
170         if (xattr == ZFS_NO_OBJECT)
171                 return -ENOENT;
172
173         /* Lookup the object number containing the xattr data */
174         rc = -zap_lookup(uos->os, xattr, name, sizeof(uint64_t), 1,
175                         &xa_data_obj);
176         if (rc)
177                 return rc;
178
179         rc = __osd_obj2dbuf(env, uos->os, xa_data_obj, &xa_data_db, FTAG);
180         if (rc)
181                 return rc;
182
183         rc = -sa_handle_get(uos->os, xa_data_obj, NULL, SA_HDL_PRIVATE,
184                         &sa_hdl);
185         if (rc)
186                 goto out_rele;
187
188         /* Get the xattr value length / object size */
189         rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(uos), &size, 8);
190         if (rc)
191                 goto out;
192
193         if (size > INT_MAX) {
194                 rc = -EOVERFLOW;
195                 goto out;
196         }
197
198         *sizep = (int)size;
199
200         if (buf == NULL || buf->lb_buf == NULL) {
201                 /* We only need to return the required size */
202                 goto out;
203         }
204         if (*sizep > buf->lb_len) {
205                 rc = -ERANGE; /* match ldiskfs error */
206                 goto out;
207         }
208
209         rc = -dmu_read(uos->os, xa_data_db->db_object, 0,
210                         size, buf->lb_buf, DMU_READ_PREFETCH);
211
212 out:
213         sa_handle_destroy(sa_hdl);
214 out_rele:
215         dmu_buf_rele(xa_data_db, FTAG);
216
217         return rc;
218 }
219
220 int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj,
221                 struct lu_buf *buf, const char *name, int *sizep)
222 {
223         int rc;
224
225         /* check SA_ZPL_DXATTR first then fallback to directory xattr */
226         rc = __osd_sa_xattr_get(env, obj, buf, name, sizep);
227         if (rc != -ENOENT)
228                 return rc;
229
230         rc = __osd_xattr_get_large(env, &osd_obj2dev(obj)->od_objset,
231                                    obj->oo_xattr, buf, name, sizep);
232
233         return rc;
234 }
235
236 int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
237                 struct lu_buf *buf, const char *name,
238                 struct lustre_capa *capa)
239 {
240         struct osd_object  *obj  = osd_dt_obj(dt);
241         int                 rc, size = 0;
242         ENTRY;
243
244         LASSERT(obj->oo_db != NULL);
245         LASSERT(osd_invariant(obj));
246         LASSERT(dt_object_exists(dt));
247
248         down(&obj->oo_guard);
249         rc = __osd_xattr_get(env, obj, buf, name, &size);
250         up(&obj->oo_guard);
251
252         if (rc == -ENOENT)
253                 rc = -ENODATA;
254         else if (rc == 0)
255                 rc = size;
256         RETURN(rc);
257 }
258
259 void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
260                         int vallen, const char *name, struct osd_thandle *oh)
261 {
262         struct osd_device *osd = osd_obj2dev(obj);
263         udmu_objset_t     *uos = &osd->od_objset;
264         dmu_buf_t         *db = obj->oo_db;
265         dmu_tx_t          *tx = oh->ot_tx;
266         uint64_t           xa_data_obj;
267         int                rc = 0;
268         int                here;
269
270         here = dt_object_exists(&obj->oo_dt);
271
272         /* object may be not yet created */
273         if (here) {
274                 LASSERT(db);
275                 LASSERT(obj->oo_sa_hdl);
276                 /* we might just update SA_ZPL_DXATTR */
277                 dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
278
279                 if (obj->oo_xattr == ZFS_NO_OBJECT)
280                         rc = -ENOENT;
281         }
282
283         if (!here || rc == -ENOENT) {
284                 /* we'll be updating SA_ZPL_XATTR */
285                 if (here) {
286                         LASSERT(obj->oo_sa_hdl);
287                         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
288                 }
289                 /* xattr zap + entry */
290                 dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name);
291                 /* xattr value obj */
292                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
293                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
294                 return;
295         }
296
297         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
298                         &xa_data_obj);
299         if (rc == 0) {
300                 /*
301                  * Entry already exists.
302                  * We'll truncate the existing object.
303                  */
304                 dmu_tx_hold_bonus(tx, xa_data_obj);
305                 dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END);
306                 dmu_tx_hold_write(tx, xa_data_obj, 0, vallen);
307                 return;
308         } else if (rc == -ENOENT) {
309                 /*
310                  * Entry doesn't exist, we need to create a new one and a new
311                  * object to store the value.
312                  */
313                 dmu_tx_hold_bonus(tx, obj->oo_xattr);
314                 dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name);
315                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
316                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
317                 return;
318         }
319
320         /* An error happened */
321         tx->tx_err = -rc;
322 }
323
324 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
325                 const struct lu_buf *buf, const char *name,
326                 int fl, struct thandle *handle)
327 {
328         struct osd_object  *obj = osd_dt_obj(dt);
329         struct osd_thandle *oh;
330         ENTRY;
331
332         LASSERT(handle != NULL);
333         oh = container_of0(handle, struct osd_thandle, ot_super);
334
335         down(&obj->oo_guard);
336         __osd_xattr_declare_set(env, obj, buf->lb_len, name, oh);
337         up(&obj->oo_guard);
338
339         RETURN(0);
340 }
341
342 /*
343  * Set an extended attribute.
344  * This transaction must have called udmu_xattr_declare_set() first.
345  *
346  * Returns 0 on success or a negative error number on failure.
347  *
348  * No locking is done here.
349  */
350 static int
351 __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
352                         struct osd_thandle *oh)
353 {
354         struct osd_device *osd = osd_obj2dev(obj);
355         udmu_objset_t     *uos = &osd->od_objset;
356         char              *dxattr;
357         size_t             sa_size;
358         int                rc;
359
360         ENTRY;
361         LASSERT(obj->oo_sa_hdl);
362         LASSERT(obj->oo_sa_xattr);
363
364         /* Update the SA for additions, modifications, and removals. */
365         rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
366         if (rc)
367                 return rc;
368
369         dxattr = sa_spill_alloc(KM_SLEEP);
370         if (dxattr == NULL)
371                 RETURN(-ENOMEM);
372
373         rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
374                                 NV_ENCODE_XDR, KM_SLEEP);
375         if (rc)
376                 GOTO(out_free, rc);
377
378         rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(uos), dxattr, sa_size, oh);
379 out_free:
380         sa_spill_free(dxattr);
381         RETURN(rc);
382 }
383
384 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
385                         const struct lu_buf *buf, const char *name, int fl,
386                         struct osd_thandle *oh)
387 {
388         uchar_t *nv_value;
389         size_t  size;
390         int     nv_size;
391         int     rc;
392         int     too_big = 0;
393
394         LASSERT(obj->oo_sa_hdl);
395         if (obj->oo_sa_xattr == NULL) {
396                 rc = __osd_xattr_cache(env, obj);
397                 if (rc)
398                         return rc;
399         }
400
401         LASSERT(obj->oo_sa_xattr);
402         /* Limited to 32k to keep nvpair memory allocations small */
403         if (buf->lb_len > DXATTR_MAX_ENTRY_SIZE) {
404                 too_big = 1;
405         } else {
406                 /* Prevent the DXATTR SA from consuming the entire SA
407                  * region */
408                 rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
409                 if (rc)
410                         return rc;
411
412                 if (size + buf->lb_len > DXATTR_MAX_SA_SIZE)
413                         too_big = 1;
414         }
415
416         /* even in case of -EFBIG we must lookup xattr and check can we
417          * rewrite it then delete from SA */
418         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
419                                         &nv_size);
420         if (rc == 0) {
421                 if (fl & LU_XATTR_CREATE) {
422                         return -EEXIST;
423                 } else if (too_big) {
424                         rc = -nvlist_remove(obj->oo_sa_xattr, name,
425                                                 DATA_TYPE_BYTE_ARRAY);
426                         if (rc < 0)
427                                 return rc;
428                         rc = __osd_sa_xattr_update(env, obj, oh);
429                         return rc == 0 ? -EFBIG : rc;
430                 }
431         } else if (rc == -ENOENT) {
432                 if (fl & LU_XATTR_REPLACE)
433                         return -ENODATA;
434                 else if (too_big)
435                         return -EFBIG;
436         } else {
437                 return rc;
438         }
439
440         /* Ensure xattr doesn't exist in ZAP */
441         if (obj->oo_xattr != ZFS_NO_OBJECT) {
442                 udmu_objset_t     *uos = &osd_obj2dev(obj)->od_objset;
443                 uint64_t           xa_data_obj;
444                 rc = -zap_lookup(uos->os, obj->oo_xattr,
445                                  name, 8, 1, &xa_data_obj);
446                 if (rc == 0) {
447                         rc = __osd_object_free(uos, xa_data_obj, oh->ot_tx);
448                         if (rc == 0)
449                                 zap_remove(uos->os, obj->oo_xattr,
450                                            name, oh->ot_tx);
451                 }
452         }
453
454         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, name,
455                                     (uchar_t *)buf->lb_buf, buf->lb_len);
456         if (rc)
457                 return rc;
458
459         rc = __osd_sa_xattr_update(env, obj, oh);
460         return rc;
461 }
462
463 int
464 __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
465                 const struct lu_buf *buf, const char *name, int fl,
466                 struct osd_thandle *oh)
467 {
468         struct osd_device *osd = osd_obj2dev(obj);
469         udmu_objset_t     *uos = &osd->od_objset;
470         dmu_buf_t         *xa_zap_db = NULL;
471         dmu_buf_t         *xa_data_db = NULL;
472         uint64_t           xa_data_obj;
473         sa_handle_t       *sa_hdl = NULL;
474         dmu_tx_t          *tx = oh->ot_tx;
475         uint64_t           size;
476         int                rc;
477
478         LASSERT(obj->oo_sa_hdl);
479
480         if (obj->oo_xattr == ZFS_NO_OBJECT) {
481                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
482
483                 la->la_valid = LA_MODE;
484                 la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
485                 rc = __osd_zap_create(env, uos, &xa_zap_db, tx, la,
486                                       obj->oo_db->db_object, FTAG, 0);
487                 if (rc)
488                         return rc;
489
490                 obj->oo_xattr = xa_zap_db->db_object;
491                 rc = osd_object_sa_update(obj, SA_ZPL_XATTR(uos),
492                                 &obj->oo_xattr, 8, oh);
493                 if (rc)
494                         goto out;
495         }
496
497         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
498                         &xa_data_obj);
499         if (rc == 0) {
500                 if (fl & LU_XATTR_CREATE) {
501                         rc = -EEXIST;
502                         goto out;
503                 }
504                 /*
505                  * Entry already exists.
506                  * We'll truncate the existing object.
507                  */
508                 rc = __osd_obj2dbuf(env, uos->os, xa_data_obj,
509                                         &xa_data_db, FTAG);
510                 if (rc)
511                         goto out;
512
513                 rc = -sa_handle_get(uos->os, xa_data_obj, NULL,
514                                         SA_HDL_PRIVATE, &sa_hdl);
515                 if (rc)
516                         goto out;
517
518                 rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(uos), &size, 8);
519                 if (rc)
520                         goto out_sa;
521
522                 rc = -dmu_free_range(uos->os, xa_data_db->db_object,
523                                         0, DMU_OBJECT_END, tx);
524                 if (rc)
525                         goto out_sa;
526         } else if (rc == -ENOENT) {
527                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
528                 /*
529                  * Entry doesn't exist, we need to create a new one and a new
530                  * object to store the value.
531                  */
532                 if (fl & LU_XATTR_REPLACE) {
533                         /* should be ENOATTR according to the
534                          * man, but that is undefined here */
535                         rc = -ENODATA;
536                         goto out;
537                 }
538
539                 la->la_valid = LA_MODE;
540                 la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
541                 rc = __osd_object_create(env, uos, &xa_data_db, tx, la,
542                                          obj->oo_xattr, FTAG);
543                 if (rc)
544                         goto out;
545                 xa_data_obj = xa_data_db->db_object;
546
547                 rc = -sa_handle_get(uos->os, xa_data_obj, NULL,
548                                         SA_HDL_PRIVATE, &sa_hdl);
549                 if (rc)
550                         goto out;
551
552                 rc = -zap_add(uos->os, obj->oo_xattr, name, sizeof(uint64_t),
553                                 1, &xa_data_obj, tx);
554                 if (rc)
555                         goto out_sa;
556         } else {
557                 /* There was an error looking up the xattr name */
558                 goto out;
559         }
560
561         /* Finally write the xattr value */
562         dmu_write(uos->os, xa_data_obj, 0, buf->lb_len, buf->lb_buf, tx);
563
564         size = buf->lb_len;
565         rc = -sa_update(sa_hdl, SA_ZPL_SIZE(uos), &size, 8, tx);
566
567 out_sa:
568         sa_handle_destroy(sa_hdl);
569 out:
570         if (xa_data_db != NULL)
571                 dmu_buf_rele(xa_data_db, FTAG);
572         if (xa_zap_db != NULL)
573                 dmu_buf_rele(xa_zap_db, FTAG);
574
575         return rc;
576 }
577
578 int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
579                   const struct lu_buf *buf, const char *name, int fl,
580                   struct thandle *handle, struct lustre_capa *capa)
581 {
582         struct osd_object  *obj = osd_dt_obj(dt);
583         struct osd_thandle *oh;
584         int rc = 0;
585         ENTRY;
586
587         LASSERT(handle != NULL);
588         LASSERT(osd_invariant(obj));
589         LASSERT(dt_object_exists(dt));
590         LASSERT(obj->oo_db);
591
592         oh = container_of0(handle, struct osd_thandle, ot_super);
593
594         down(&obj->oo_guard);
595         CDEBUG(D_INODE, "Setting xattr %s with size %d\n",
596                 name, (int)buf->lb_len);
597         rc = osd_xattr_set_internal(env, obj, buf, name, fl, oh, capa);
598         up(&obj->oo_guard);
599
600         RETURN(rc);
601 }
602
603 static void
604 __osd_xattr_declare_del(const struct lu_env *env, struct osd_object *obj,
605                         const char *name, struct osd_thandle *oh)
606 {
607         struct osd_device *osd = osd_obj2dev(obj);
608         udmu_objset_t     *uos = &osd->od_objset;
609         dmu_tx_t          *tx = oh->ot_tx;
610         uint64_t           xa_data_obj;
611         int                rc;
612
613         /* update SA_ZPL_DXATTR if xattr was in SA */
614         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 0);
615
616         if (obj->oo_xattr == ZFS_NO_OBJECT)
617                 return;
618
619         rc = -zap_lookup(uos->os, obj->oo_xattr, name, 8, 1, &xa_data_obj);
620         if (rc == 0) {
621                 /*
622                  * Entry exists.
623                  * We'll delete the existing object and ZAP entry.
624                  */
625                 dmu_tx_hold_bonus(tx, xa_data_obj);
626                 dmu_tx_hold_free(tx, xa_data_obj, 0, DMU_OBJECT_END);
627                 dmu_tx_hold_zap(tx, obj->oo_xattr, FALSE, (char *) name);
628                 return;
629         } else if (rc == -ENOENT) {
630                 /*
631                  * Entry doesn't exist, nothing to be changed.
632                  */
633                 return;
634         }
635
636         /* An error happened */
637         tx->tx_err = -rc;
638 }
639
640 int osd_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
641                         const char *name, struct thandle *handle)
642 {
643         struct osd_object  *obj = osd_dt_obj(dt);
644         struct osd_thandle *oh;
645         ENTRY;
646
647         LASSERT(handle != NULL);
648         LASSERT(dt_object_exists(dt));
649         LASSERT(osd_invariant(obj));
650
651         oh = container_of0(handle, struct osd_thandle, ot_super);
652         LASSERT(oh->ot_tx != NULL);
653         LASSERT(obj->oo_db != NULL);
654
655         down(&obj->oo_guard);
656         __osd_xattr_declare_del(env, obj, name, oh);
657         up(&obj->oo_guard);
658
659         RETURN(0);
660 }
661
662 int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
663                         const char *name, struct osd_thandle *oh)
664 {
665         int rc;
666
667         if (obj->oo_sa_xattr == NULL) {
668                 rc = __osd_xattr_cache(env, obj);
669                 if (rc)
670                         return rc;
671         }
672
673         rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
674         if (rc == 0)
675                 rc = __osd_sa_xattr_update(env, obj, oh);
676         return rc;
677 }
678
679 int __osd_xattr_del(const struct lu_env *env, struct osd_object *obj,
680                         const char *name, struct osd_thandle *oh)
681 {
682         struct osd_device *osd = osd_obj2dev(obj);
683         udmu_objset_t     *uos = &osd->od_objset;
684         uint64_t           xa_data_obj;
685         int                rc;
686
687         /* try remove xattr from SA at first */
688         rc = __osd_sa_xattr_del(env, obj, name, oh);
689         if (rc != -ENOENT)
690                 return rc;
691
692         if (obj->oo_xattr == ZFS_NO_OBJECT)
693                 return 0;
694
695         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
696                         &xa_data_obj);
697         if (rc == -ENOENT) {
698                 rc = 0;
699         } else if (rc == 0) {
700                 /*
701                  * Entry exists.
702                  * We'll delete the existing object and ZAP entry.
703                  */
704                 rc = __osd_object_free(uos, xa_data_obj, oh->ot_tx);
705                 if (rc)
706                         return rc;
707
708                 rc = -zap_remove(uos->os, obj->oo_xattr, name, oh->ot_tx);
709         }
710
711         return rc;
712 }
713
714 int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
715                 const char *name, struct thandle *handle,
716                 struct lustre_capa *capa)
717 {
718         struct osd_object  *obj = osd_dt_obj(dt);
719         struct osd_thandle *oh;
720         int                 rc;
721         ENTRY;
722
723         LASSERT(handle != NULL);
724         LASSERT(obj->oo_db != NULL);
725         LASSERT(osd_invariant(obj));
726         LASSERT(dt_object_exists(dt));
727         oh = container_of0(handle, struct osd_thandle, ot_super);
728         LASSERT(oh->ot_tx != NULL);
729
730         down(&obj->oo_guard);
731         rc = __osd_xattr_del(env, obj, name, oh);
732         up(&obj->oo_guard);
733
734         RETURN(rc);
735 }
736
737 static int
738 osd_sa_xattr_list(const struct lu_env *env, struct osd_object *obj,
739                 struct lu_buf *lb)
740 {
741         nvpair_t *nvp = NULL;
742         int       len, counted = 0, remain = lb->lb_len;
743         int       rc = 0;
744
745         if (obj->oo_sa_xattr == NULL) {
746                 rc = __osd_xattr_cache(env, obj);
747                 if (rc)
748                         return rc;
749         }
750
751         LASSERT(obj->oo_sa_xattr);
752
753         while ((nvp = nvlist_next_nvpair(obj->oo_sa_xattr, nvp)) != NULL) {
754                 len = strlen(nvpair_name(nvp));
755                 if (lb->lb_buf != NULL) {
756                         if (len + 1 > remain)
757                                 return -ERANGE;
758
759                         memcpy(lb->lb_buf, nvpair_name(nvp), len);
760                         lb->lb_buf += len;
761                         *((char *)lb->lb_buf) = '\0';
762                         lb->lb_buf++;
763                         remain -= len + 1;
764                 }
765                 counted += len + 1;
766         }
767         return counted;
768 }
769
770 int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
771                 struct lu_buf *lb, struct lustre_capa *capa)
772 {
773         struct osd_thread_info *oti = osd_oti_get(env);
774         struct osd_object      *obj = osd_dt_obj(dt);
775         struct osd_device      *osd = osd_obj2dev(obj);
776         udmu_objset_t          *uos = &osd->od_objset;
777         zap_cursor_t           *zc;
778         int                    rc, counted = 0, remain = lb->lb_len;
779         ENTRY;
780
781         LASSERT(obj->oo_db != NULL);
782         LASSERT(osd_invariant(obj));
783         LASSERT(dt_object_exists(dt));
784
785         down(&obj->oo_guard);
786
787         rc = osd_sa_xattr_list(env, obj, lb);
788         if (rc < 0)
789                 GOTO(out, rc);
790         counted = rc;
791         remain -= counted;
792
793         /* continue with dnode xattr if any */
794         if (obj->oo_xattr == ZFS_NO_OBJECT)
795                 GOTO(out, rc = counted);
796
797         rc = -udmu_zap_cursor_init(&zc, uos, obj->oo_xattr, 0);
798         if (rc)
799                 GOTO(out, rc);
800
801         while ((rc = -udmu_zap_cursor_retrieve_key(env, zc, oti->oti_key,
802                                                 MAXNAMELEN)) == 0) {
803                 rc = strlen(oti->oti_key);
804                 if (lb->lb_buf != NULL) {
805                         if (rc + 1 > remain)
806                                 RETURN(-ERANGE);
807
808                         memcpy(lb->lb_buf, oti->oti_key, rc);
809                         lb->lb_buf += rc;
810                         *((char *)lb->lb_buf) = '\0';
811                         lb->lb_buf++;
812                         remain -= rc + 1;
813                 }
814                 counted += rc + 1;
815
816                 zap_cursor_advance(zc);
817         }
818         if (rc == -ENOENT) /* no more kes in the index */
819                 rc = 0;
820         else if (unlikely(rc < 0))
821                 GOTO(out_fini, rc);
822         rc = counted;
823
824 out_fini:
825         udmu_zap_cursor_fini(zc);
826 out:
827         up(&obj->oo_guard);
828         RETURN(rc);
829
830 }
831
832