Whamcloud - gitweb
LU-1305 osd: xattr support for osd-zfs
[fs/lustre-release.git] / lustre / osd-zfs / osd_xattr.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2011, 2012 Whamcloud, Inc.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_xattr.c
39  * functions to manipulate extended attributes and system attributes
40  *
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mike Pershin <tappro@whamcloud.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_OSD
49
50 #include <lustre_ver.h>
51 #include <libcfs/libcfs.h>
52 #include <lustre_fsfilt.h>
53 #include <obd_support.h>
54 #include <lustre_net.h>
55 #include <obd.h>
56 #include <obd_class.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59
60 #include "osd_internal.h"
61
62 #include <sys/dnode.h>
63 #include <sys/dbuf.h>
64 #include <sys/spa.h>
65 #include <sys/stat.h>
66 #include <sys/zap.h>
67 #include <sys/spa_impl.h>
68 #include <sys/zfs_znode.h>
69 #include <sys/dmu_tx.h>
70 #include <sys/dmu_objset.h>
71 #include <sys/dsl_prop.h>
72 #include <sys/sa_impl.h>
73 #include <sys/txg.h>
74
75
76 /*
77  * Copy an extended attribute into the buffer provided, or compute the
78  * required buffer size.
79  *
80  * If buf is NULL, it computes the required buffer size.
81  *
82  * Returns 0 on success or a negative error number on failure.
83  * On success, the number of bytes used / required is stored in 'size'.
84  *
85  * No locking is done here.
86  */
87 int __osd_xattr_cache(const struct lu_env *env, struct osd_object *obj)
88 {
89         struct osd_device *osd = osd_obj2dev(obj);
90         udmu_objset_t     *uos = &osd->od_objset;
91         sa_handle_t       *sa_hdl;
92         char              *buf;
93         int                size;
94         int                rc;
95
96         LASSERT(obj->oo_sa_xattr == NULL);
97         LASSERT(obj->oo_db != NULL);
98
99         rc = -sa_handle_get(uos->os, obj->oo_db->db_object, NULL,
100                         SA_HDL_PRIVATE, &sa_hdl);
101         if (rc)
102                 return rc;
103
104         rc = -sa_size(sa_hdl, SA_ZPL_DXATTR(uos), &size);
105         if (rc) {
106                 if (rc == -ENOENT)
107                         rc = -nvlist_alloc(&obj->oo_sa_xattr,
108                                         NV_UNIQUE_NAME, KM_SLEEP);
109                 goto out_sa;
110         }
111
112         buf = sa_spill_alloc(KM_SLEEP);
113         if (buf == NULL) {
114                 rc = -ENOMEM;
115                 goto out_sa;
116         }
117         rc = -sa_lookup(sa_hdl, SA_ZPL_DXATTR(uos), buf, size);
118         if (rc == 0)
119                 rc = -nvlist_unpack(buf, size, &obj->oo_sa_xattr, KM_SLEEP);
120         sa_spill_free(buf);
121 out_sa:
122         sa_handle_destroy(sa_hdl);
123
124         return rc;
125 }
126
127 int __osd_sa_xattr_get(const struct lu_env *env, struct osd_object *obj,
128                 const struct lu_buf *buf, const char *name, int *sizep)
129 {
130         uchar_t *nv_value;
131         int      rc;
132
133         LASSERT(obj->oo_sa_hdl);
134
135         if (obj->oo_sa_xattr == NULL) {
136                 rc = __osd_xattr_cache(env, obj);
137                 if (rc)
138                         return rc;
139         }
140
141         LASSERT(obj->oo_sa_xattr);
142         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
143                         sizep);
144         if (rc)
145                 return rc;
146
147         if (buf == NULL || buf->lb_buf == NULL) {
148                 /* return the required size by *sizep */
149                 return 0;
150         }
151
152         if (*sizep > buf->lb_len)
153                 return -ERANGE; /* match ldiskfs error */
154
155         memcpy(buf->lb_buf, nv_value, *sizep);
156         return 0;
157 }
158
159 int __osd_xattr_get(const struct lu_env *env, struct osd_object *obj,
160                 struct lu_buf *buf, const char *name, int *sizep)
161 {
162         struct osd_device *osd = osd_obj2dev(obj);
163         udmu_objset_t     *uos = &osd->od_objset;
164         uint64_t           xa_data_obj;
165         dmu_buf_t         *xa_data_db;
166         sa_handle_t       *sa_hdl = NULL;
167         uint64_t           size;
168         int                rc;
169
170         /* check SA_ZPL_DXATTR first then fallback to directory xattr */
171         rc = __osd_sa_xattr_get(env, obj, buf, name, sizep);
172         if (rc != -ENOENT)
173                 return rc;
174
175         /* are there any extended attributes? */
176         if (obj->oo_xattr == ZFS_NO_OBJECT)
177                 return -ENOENT;
178
179         /* Lookup the object number containing the xattr data */
180         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
181                         &xa_data_obj);
182         if (rc)
183                 return rc;
184
185         rc = __osd_obj2dbuf(env, uos->os, xa_data_obj, &xa_data_db, FTAG);
186         if (rc)
187                 return rc;
188
189         rc = -sa_handle_get(uos->os, xa_data_obj, NULL, SA_HDL_PRIVATE,
190                         &sa_hdl);
191         if (rc)
192                 goto out_rele;
193
194         /* Get the xattr value length / object size */
195         rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(uos), &size, 8);
196         if (rc)
197                 goto out;
198
199         if (size > INT_MAX) {
200                 rc = -EOVERFLOW;
201                 goto out;
202         }
203
204         *sizep = (int)size;
205
206         if (buf == NULL || buf->lb_buf == NULL) {
207                 /* We only need to return the required size */
208                 goto out;
209         }
210         if (*sizep > buf->lb_len) {
211                 rc = -ERANGE; /* match ldiskfs error */
212                 goto out;
213         }
214
215         rc = -dmu_read(uos->os, xa_data_db->db_object, 0,
216                         size, buf->lb_buf, DMU_READ_PREFETCH);
217
218 out:
219         sa_handle_destroy(sa_hdl);
220 out_rele:
221         dmu_buf_rele(xa_data_db, FTAG);
222         return rc;
223 }
224
225 int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
226                 struct lu_buf *buf, const char *name,
227                 struct lustre_capa *capa)
228 {
229         struct osd_object  *obj  = osd_dt_obj(dt);
230         int                 rc, size = 0;
231         ENTRY;
232
233         LASSERT(obj->oo_db != NULL);
234         LASSERT(osd_invariant(obj));
235         LASSERT(dt_object_exists(dt));
236
237         cfs_down(&obj->oo_guard);
238         rc = __osd_xattr_get(env, obj, buf, name, &size);
239         cfs_up(&obj->oo_guard);
240
241         if (rc == -ENOENT)
242                 rc = -ENODATA;
243         else if (rc == 0)
244                 rc = size;
245         RETURN(rc);
246 }
247
248 void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj,
249                         int vallen, const char *name, struct osd_thandle *oh)
250 {
251         struct osd_device *osd = osd_obj2dev(obj);
252         udmu_objset_t     *uos = &osd->od_objset;
253         dmu_buf_t         *db = obj->oo_db;
254         dmu_tx_t          *tx = oh->ot_tx;
255         uint64_t           xa_data_obj;
256         int                rc = 0;
257         int                here;
258
259         here = dt_object_exists(&obj->oo_dt);
260
261         /* object may be not yet created */
262         if (here) {
263                 LASSERT(db);
264                 LASSERT(obj->oo_sa_hdl);
265                 /* we might just update SA_ZPL_DXATTR */
266                 dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
267
268                 if (obj->oo_xattr == ZFS_NO_OBJECT)
269                         rc = -ENOENT;
270         }
271
272         if (!here || rc == -ENOENT) {
273                 /* we'll be updating SA_ZPL_XATTR */
274                 if (here) {
275                         LASSERT(obj->oo_sa_hdl);
276                         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1);
277                 }
278                 /* xattr zap + entry */
279                 dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name);
280                 /* xattr value obj */
281                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
282                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
283                 return;
284         }
285
286         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
287                         &xa_data_obj);
288         if (rc == 0) {
289                 /*
290                  * Entry already exists.
291                  * We'll truncate the existing object.
292                  */
293                 dmu_tx_hold_bonus(tx, xa_data_obj);
294                 dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END);
295                 dmu_tx_hold_write(tx, xa_data_obj, 0, vallen);
296                 return;
297         } else if (rc == -ENOENT) {
298                 /*
299                  * Entry doesn't exist, we need to create a new one and a new
300                  * object to store the value.
301                  */
302                 dmu_tx_hold_bonus(tx, obj->oo_xattr);
303                 dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name);
304                 dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
305                 dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen);
306                 return;
307         }
308
309         /* An error happened */
310         tx->tx_err = -rc;
311 }
312
313 int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
314                 const struct lu_buf *buf, const char *name,
315                 int fl, struct thandle *handle)
316 {
317         struct osd_object  *obj = osd_dt_obj(dt);
318         struct osd_thandle *oh;
319         ENTRY;
320
321         LASSERT(handle != NULL);
322         oh = container_of0(handle, struct osd_thandle, ot_super);
323
324         cfs_down(&obj->oo_guard);
325         __osd_xattr_declare_set(env, obj, buf->lb_len, name, oh);
326         cfs_up(&obj->oo_guard);
327
328         RETURN(0);
329 }
330
331 /*
332  * Set an extended attribute.
333  * This transaction must have called udmu_xattr_declare_set() first.
334  *
335  * Returns 0 on success or a negative error number on failure.
336  *
337  * No locking is done here.
338  */
339 static int
340 __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
341                         struct osd_thandle *oh)
342 {
343         struct osd_device *osd = osd_obj2dev(obj);
344         udmu_objset_t     *uos = &osd->od_objset;
345         char              *dxattr;
346         size_t             sa_size;
347         int                rc;
348
349         ENTRY;
350         LASSERT(obj->oo_sa_hdl);
351         LASSERT(obj->oo_sa_xattr);
352
353         /* Update the SA for additions, modifications, and removals. */
354         rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
355         if (rc)
356                 return rc;
357
358         dxattr = sa_spill_alloc(KM_SLEEP);
359         if (dxattr == NULL)
360                 RETURN(-ENOMEM);
361
362         rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
363                                 NV_ENCODE_XDR, KM_SLEEP);
364         if (rc)
365                 GOTO(out_free, rc);
366
367         rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(uos), dxattr, sa_size, oh);
368 out_free:
369         sa_spill_free(dxattr);
370         RETURN(rc);
371 }
372
373 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
374                         const struct lu_buf *buf, const char *name, int fl,
375                         struct osd_thandle *oh)
376 {
377         uchar_t *nv_value;
378         size_t  size;
379         int     nv_size;
380         int     rc;
381         int     too_big = 0;
382
383         LASSERT(obj->oo_sa_hdl);
384         if (obj->oo_sa_xattr == NULL) {
385                 rc = __osd_xattr_cache(env, obj);
386                 if (rc)
387                         return rc;
388         }
389
390         LASSERT(obj->oo_sa_xattr);
391         /* Limited to 32k to keep nvpair memory allocations small */
392         if (buf->lb_len > DXATTR_MAX_ENTRY_SIZE) {
393                 too_big = 1;
394         } else {
395                 /* Prevent the DXATTR SA from consuming the entire SA
396                  * region */
397                 rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
398                 if (rc)
399                         return rc;
400
401                 if (size + buf->lb_len > DXATTR_MAX_SA_SIZE)
402                         too_big = 1;
403         }
404
405         /* even in case of -EFBIG we must lookup xattr and check can we
406          * rewrite it then delete from SA */
407         rc = -nvlist_lookup_byte_array(obj->oo_sa_xattr, name, &nv_value,
408                                         &nv_size);
409         if (rc == 0) {
410                 if (fl & LU_XATTR_CREATE) {
411                         return -EEXIST;
412                 } else if (too_big) {
413                         rc = -nvlist_remove(obj->oo_sa_xattr, name,
414                                                 DATA_TYPE_BYTE_ARRAY);
415                         if (rc < 0)
416                                 return rc;
417                         rc = __osd_sa_xattr_update(env, obj, oh);
418                         return rc == 0 ? -EFBIG : rc;
419                 }
420         } else if (rc == -ENOENT) {
421                 if (fl & LU_XATTR_REPLACE)
422                         return -ENODATA;
423                 else if (too_big)
424                         return -EFBIG;
425         } else {
426                 return rc;
427         }
428
429         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, name,
430                                     (uchar_t *)buf->lb_buf, buf->lb_len);
431         if (rc)
432                 return rc;
433
434         rc = __osd_sa_xattr_update(env, obj, oh);
435         return rc;
436 }
437
438 static int
439 __osd_xattr_set(const struct lu_env *env, struct osd_object *obj,
440                 const struct lu_buf *buf, const char *name, int fl,
441                 struct osd_thandle *oh)
442 {
443         struct osd_device *osd = osd_obj2dev(obj);
444         udmu_objset_t     *uos = &osd->od_objset;
445         dmu_buf_t         *xa_zap_db = NULL;
446         dmu_buf_t         *xa_data_db = NULL;
447         uint64_t           xa_data_obj;
448         sa_handle_t       *sa_hdl = NULL;
449         dmu_tx_t          *tx = oh->ot_tx;
450         uint64_t           size;
451         int                rc;
452
453         LASSERT(obj->oo_sa_hdl);
454
455         if (obj->oo_xattr == ZFS_NO_OBJECT) {
456                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
457
458                 la->la_valid = LA_MODE;
459                 la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
460                 rc = __osd_zap_create(env, uos, &xa_zap_db, tx, la, FTAG, 0);
461                 if (rc)
462                         return rc;
463
464                 obj->oo_xattr = xa_zap_db->db_object;
465                 rc = osd_object_sa_update(obj, SA_ZPL_XATTR(uos),
466                                 &obj->oo_xattr, 8, oh);
467                 if (rc)
468                         goto out;
469         }
470
471         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
472                         &xa_data_obj);
473         if (rc == 0) {
474                 if (fl & LU_XATTR_CREATE) {
475                         rc = -EEXIST;
476                         goto out;
477                 }
478                 /*
479                  * Entry already exists.
480                  * We'll truncate the existing object.
481                  */
482                 rc = __osd_obj2dbuf(env, uos->os, xa_data_obj,
483                                         &xa_data_db, FTAG);
484                 if (rc)
485                         goto out;
486
487                 rc = -sa_handle_get(uos->os, xa_data_obj, NULL,
488                                         SA_HDL_PRIVATE, &sa_hdl);
489                 if (rc)
490                         goto out;
491
492                 rc = -sa_lookup(sa_hdl, SA_ZPL_SIZE(uos), &size, 8);
493                 if (rc)
494                         goto out_sa;
495
496                 rc = -dmu_free_range(uos->os, xa_data_db->db_object,
497                                         0, DMU_OBJECT_END, tx);
498                 if (rc)
499                         goto out_sa;
500         } else if (rc == -ENOENT) {
501                 struct lu_attr *la = &osd_oti_get(env)->oti_la;
502                 /*
503                  * Entry doesn't exist, we need to create a new one and a new
504                  * object to store the value.
505                  */
506                 if (fl & LU_XATTR_REPLACE) {
507                         /* should be ENOATTR according to the
508                          * man, but that is undefined here */
509                         rc = -ENODATA;
510                         goto out;
511                 }
512
513                 la->la_valid = LA_MODE;
514                 la->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
515                 rc = __osd_object_create(env, uos, &xa_data_db, tx, la, FTAG);
516                 if (rc)
517                         goto out;
518                 xa_data_obj = xa_data_db->db_object;
519
520                 rc = -sa_handle_get(uos->os, xa_data_obj, NULL,
521                                         SA_HDL_PRIVATE, &sa_hdl);
522                 if (rc)
523                         goto out;
524
525                 rc = -zap_add(uos->os, obj->oo_xattr, name, sizeof(uint64_t),
526                                 1, &xa_data_obj, tx);
527                 if (rc)
528                         goto out_sa;
529         } else {
530                 /* There was an error looking up the xattr name */
531                 goto out;
532         }
533
534         /* Finally write the xattr value */
535         dmu_write(uos->os, xa_data_obj, 0, buf->lb_len, buf->lb_buf, tx);
536
537         size = buf->lb_len;
538         rc = -sa_update(sa_hdl, SA_ZPL_SIZE(uos), &size, 8, tx);
539
540 out_sa:
541         sa_handle_destroy(sa_hdl);
542 out:
543         if (xa_data_db != NULL)
544                 dmu_buf_rele(xa_data_db, FTAG);
545         if (xa_zap_db != NULL)
546                 dmu_buf_rele(xa_zap_db, FTAG);
547
548         return rc;
549 }
550
551 int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
552                 const struct lu_buf *buf, const char *name, int fl,
553                 struct thandle *handle, struct lustre_capa *capa)
554 {
555         struct osd_object  *obj = osd_dt_obj(dt);
556         struct osd_thandle *oh;
557         int rc = 0;
558         ENTRY;
559
560         LASSERT(handle != NULL);
561         LASSERT(osd_invariant(obj));
562         LASSERT(dt_object_exists(dt));
563         LASSERT(obj->oo_db);
564
565         oh = container_of0(handle, struct osd_thandle, ot_super);
566
567         cfs_down(&obj->oo_guard);
568         CDEBUG(D_INODE, "Setting xattr %s with size %d\n",
569                 name, (int)buf->lb_len);
570         rc = __osd_sa_xattr_set(env, obj, buf, name, fl, oh);
571         /* place xattr in dnode if SA is full */
572         if (rc == -EFBIG)
573                 rc = __osd_xattr_set(env, obj, buf, name, fl, oh);
574         cfs_up(&obj->oo_guard);
575
576         RETURN(rc);
577 }
578
579 static void
580 __osd_xattr_declare_del(const struct lu_env *env, struct osd_object *obj,
581                         const char *name, struct osd_thandle *oh)
582 {
583         struct osd_device *osd = osd_obj2dev(obj);
584         udmu_objset_t     *uos = &osd->od_objset;
585         dmu_tx_t          *tx = oh->ot_tx;
586         uint64_t           xa_data_obj;
587         int                rc;
588
589         /* update SA_ZPL_DXATTR if xattr was in SA */
590         dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 0);
591
592         if (obj->oo_xattr == ZFS_NO_OBJECT)
593                 return;
594
595         rc = -zap_lookup(uos->os, obj->oo_xattr, name, 8, 1, &xa_data_obj);
596         if (rc == 0) {
597                 /*
598                  * Entry exists.
599                  * We'll delete the existing object and ZAP entry.
600                  */
601                 dmu_tx_hold_bonus(tx, xa_data_obj);
602                 dmu_tx_hold_free(tx, xa_data_obj, 0, DMU_OBJECT_END);
603                 dmu_tx_hold_zap(tx, obj->oo_xattr, FALSE, (char *) name);
604                 return;
605         } else if (rc == -ENOENT) {
606                 /*
607                  * Entry doesn't exist, nothing to be changed.
608                  */
609                 return;
610         }
611
612         /* An error happened */
613         tx->tx_err = -rc;
614 }
615
616 int osd_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
617                         const char *name, struct thandle *handle)
618 {
619         struct osd_object  *obj = osd_dt_obj(dt);
620         struct osd_thandle *oh;
621         ENTRY;
622
623         LASSERT(handle != NULL);
624         LASSERT(dt_object_exists(dt));
625         LASSERT(osd_invariant(obj));
626
627         oh = container_of0(handle, struct osd_thandle, ot_super);
628         LASSERT(oh->ot_tx != NULL);
629         LASSERT(obj->oo_db != NULL);
630
631         cfs_down(&obj->oo_guard);
632         __osd_xattr_declare_del(env, obj, name, oh);
633         cfs_up(&obj->oo_guard);
634
635         RETURN(0);
636 }
637
638 int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
639                         const char *name, struct osd_thandle *oh)
640 {
641         int rc;
642
643         if (obj->oo_sa_xattr == NULL) {
644                 rc = __osd_xattr_cache(env, obj);
645                 if (rc)
646                         return rc;
647         }
648
649         rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
650         if (rc == 0)
651                 rc = __osd_sa_xattr_update(env, obj, oh);
652         return rc;
653 }
654
655 int __osd_xattr_del(const struct lu_env *env, struct osd_object *obj,
656                         const char *name, struct osd_thandle *oh)
657 {
658         struct osd_device *osd = osd_obj2dev(obj);
659         udmu_objset_t     *uos = &osd->od_objset;
660         uint64_t           xa_data_obj;
661         int                rc;
662
663         /* try remove xattr from SA at first */
664         rc = __osd_sa_xattr_del(env, obj, name, oh);
665         if (rc != -ENOENT)
666                 return rc;
667
668         if (obj->oo_xattr == ZFS_NO_OBJECT)
669                 return rc;
670
671         rc = -zap_lookup(uos->os, obj->oo_xattr, name, sizeof(uint64_t), 1,
672                         &xa_data_obj);
673         if (rc == -ENOENT) {
674                 rc = 0;
675         } else if (rc == 0) {
676                 /*
677                  * Entry exists.
678                  * We'll delete the existing object and ZAP entry.
679                  */
680                 rc = __osd_object_free(uos, xa_data_obj, oh->ot_tx);
681                 if (rc)
682                         return rc;
683
684                 rc = -zap_remove(uos->os, obj->oo_xattr, name, oh->ot_tx);
685         }
686
687         return rc;
688 }
689
690 int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
691                 const char *name, struct thandle *handle,
692                 struct lustre_capa *capa)
693 {
694         struct osd_object  *obj = osd_dt_obj(dt);
695         struct osd_thandle *oh;
696         int                 rc;
697         ENTRY;
698
699         LASSERT(handle != NULL);
700         LASSERT(obj->oo_db != NULL);
701         LASSERT(osd_invariant(obj));
702         LASSERT(dt_object_exists(dt));
703         oh = container_of0(handle, struct osd_thandle, ot_super);
704         LASSERT(oh->ot_tx != NULL);
705
706         cfs_down(&obj->oo_guard);
707         rc = __osd_xattr_del(env, obj, name, oh);
708         cfs_up(&obj->oo_guard);
709
710         RETURN(rc);
711 }
712
713 static int
714 osd_sa_xattr_list(const struct lu_env *env, struct osd_object *obj,
715                 struct lu_buf *lb)
716 {
717         nvpair_t *nvp = NULL;
718         int       len, counted = 0, remain = lb->lb_len;
719         int       rc = 0;
720
721         if (obj->oo_sa_xattr == NULL) {
722                 rc = __osd_xattr_cache(env, obj);
723                 if (rc)
724                         return rc;
725         }
726
727         LASSERT(obj->oo_sa_xattr);
728
729         while ((nvp = nvlist_next_nvpair(obj->oo_sa_xattr, nvp)) != NULL) {
730                 len = strlen(nvpair_name(nvp));
731                 if (len >= remain)
732                         return -ERANGE;
733
734                 memcpy(lb->lb_buf, nvpair_name(nvp), len);
735                 lb->lb_buf += len;
736                 *((char *)lb->lb_buf) = '\0';
737                 lb->lb_buf++;
738                 remain -= len + 1;
739                 counted += len + 1;
740         }
741         return counted;
742 }
743
744 int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
745                 struct lu_buf *lb, struct lustre_capa *capa)
746 {
747         struct osd_thread_info *oti = osd_oti_get(env);
748         struct osd_object      *obj = osd_dt_obj(dt);
749         struct osd_device      *osd = osd_obj2dev(obj);
750         udmu_objset_t          *uos = &osd->od_objset;
751         zap_cursor_t           *zc;
752         int                    rc, counted = 0, remain = lb->lb_len;
753         ENTRY;
754
755         LASSERT(obj->oo_db != NULL);
756         LASSERT(osd_invariant(obj));
757         LASSERT(dt_object_exists(dt));
758
759         cfs_down(&obj->oo_guard);
760
761         rc = osd_sa_xattr_list(env, obj, lb);
762         if (rc < 0)
763                 GOTO(out, rc);
764         counted = rc;
765         remain -= counted;
766
767         /* continue with dnode xattr if any */
768         if (obj->oo_xattr == ZFS_NO_OBJECT)
769                 GOTO(out, rc = counted);
770
771         rc = -udmu_zap_cursor_init(&zc, uos, obj->oo_xattr, 0);
772         if (rc)
773                 GOTO(out, rc);
774
775         while ((rc = -udmu_zap_cursor_retrieve_key(env, zc, oti->oti_key,
776                                                 MAXNAMELEN)) == 0) {
777                 rc = strlen(oti->oti_key);
778                 if (rc >= remain)
779                         GOTO(out_fini, rc = -ERANGE);
780
781                 memcpy(lb->lb_buf, oti->oti_key, rc);
782                 lb->lb_buf += rc;
783                 *((char *)lb->lb_buf) = '\0';
784                 lb->lb_buf++;
785                 remain -= rc + 1;
786                 counted += rc + 1;
787
788                 zap_cursor_advance(zc);
789         }
790         if (rc < 0)
791                 GOTO(out_fini, rc);
792         rc = counted;
793
794 out_fini:
795         udmu_zap_cursor_fini(zc);
796 out:
797         cfs_up(&obj->oo_guard);
798         RETURN(rc);
799
800 }
801
802