Whamcloud - gitweb
c68f8de9c3c86751d1af096c44479c9a4982a200
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd-zfs/osd_object.c
33  *
34  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
35  * Author: Mike Pershin <tappro@whamcloud.com>
36  * Author: Johann Lombardi <johann@whamcloud.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_OSD
40
41 #include <lustre_ver.h>
42 #include <libcfs/libcfs.h>
43 #include <obd_support.h>
44 #include <lustre_net.h>
45 #include <obd.h>
46 #include <obd_class.h>
47 #include <lustre_disk.h>
48 #include <lustre_fid.h>
49
50 #include "osd_internal.h"
51
52 #include <sys/dnode.h>
53 #include <sys/dbuf.h>
54 #include <sys/spa.h>
55 #include <sys/stat.h>
56 #include <sys/zap.h>
57 #include <sys/spa_impl.h>
58 #include <sys/zfs_znode.h>
59 #include <sys/dmu_tx.h>
60 #include <sys/dmu_objset.h>
61 #include <sys/dsl_prop.h>
62 #include <sys/sa_impl.h>
63 #include <sys/txg.h>
64
65 char *osd_obj_tag = "osd_object";
66
67 static struct dt_object_operations osd_obj_ops;
68 static struct lu_object_operations osd_lu_obj_ops;
69 extern struct dt_body_operations osd_body_ops;
70 static struct dt_object_operations osd_obj_otable_it_ops;
71
72 extern struct kmem_cache *osd_object_kmem;
73
74 static void
75 osd_object_sa_fini(struct osd_object *obj)
76 {
77         if (obj->oo_sa_hdl) {
78                 sa_handle_destroy(obj->oo_sa_hdl);
79                 obj->oo_sa_hdl = NULL;
80         }
81 }
82
83 static int
84 osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
85 {
86         int rc;
87
88         LASSERT(obj->oo_sa_hdl == NULL);
89         LASSERT(obj->oo_dn != NULL);
90
91         rc = osd_sa_handle_get(obj);
92         if (rc)
93                 return rc;
94
95         /* Cache the xattr object id, valid for the life of the object */
96         rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_XATTR(o), &obj->oo_xattr, 8);
97         if (rc == -ENOENT) {
98                 obj->oo_xattr = ZFS_NO_OBJECT;
99                 rc = 0;
100         } else if (rc) {
101                 osd_object_sa_fini(obj);
102         }
103
104         return rc;
105 }
106
107 /*
108  * Add object to list of dirty objects in tx handle.
109  */
110 void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
111 {
112         if (!list_empty(&obj->oo_sa_linkage))
113                 return;
114
115         write_lock(&obj->oo_attr_lock);
116         if (likely(list_empty(&obj->oo_sa_linkage)))
117                 list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
118         write_unlock(&obj->oo_attr_lock);
119 }
120
121 /*
122  * Release spill block dbuf hold for all dirty SAs.
123  */
124 void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
125 {
126         struct osd_object *obj;
127
128         while (!list_empty(&oh->ot_sa_list)) {
129                 obj = list_entry(oh->ot_sa_list.next,
130                                  struct osd_object, oo_sa_linkage);
131                 write_lock(&obj->oo_attr_lock);
132                 list_del_init(&obj->oo_sa_linkage);
133                 write_unlock(&obj->oo_attr_lock);
134                 if (obj->oo_late_xattr) {
135                         /*
136                          * take oo_guard to protect oo_sa_xattr buffer
137                          * from concurrent update by osd_xattr_set()
138                          */
139                         LASSERT(oh->ot_assigned != 0);
140                         down_write(&obj->oo_guard);
141                         if (obj->oo_late_attr_set)
142                                 __osd_sa_attr_init(env, obj, oh);
143                         else if (obj->oo_late_xattr)
144                                 __osd_sa_xattr_update(env, obj, oh);
145                         up_write(&obj->oo_guard);
146                 }
147                 sa_spill_rele(obj->oo_sa_hdl);
148         }
149 }
150
151 /*
152  * Update the SA and add the object to the dirty list.
153  */
154 int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
155                          void *buf, uint32_t buflen, struct osd_thandle *oh)
156 {
157         int rc;
158
159         LASSERT(obj->oo_sa_hdl != NULL);
160         LASSERT(oh->ot_tx != NULL);
161
162         rc = -sa_update(obj->oo_sa_hdl, type, buf, buflen, oh->ot_tx);
163         osd_object_sa_dirty_add(obj, oh);
164
165         return rc;
166 }
167
168 /*
169  * Bulk update the SA and add the object to the dirty list.
170  */
171 static int
172 osd_object_sa_bulk_update(struct osd_object *obj, sa_bulk_attr_t *attrs,
173                           int count, struct osd_thandle *oh)
174 {
175         int rc;
176
177         LASSERT(obj->oo_sa_hdl != NULL);
178         LASSERT(oh->ot_tx != NULL);
179
180         rc = -sa_bulk_update(obj->oo_sa_hdl, attrs, count, oh->ot_tx);
181         osd_object_sa_dirty_add(obj, oh);
182
183         return rc;
184 }
185
186 /*
187  * Retrieve the attributes of a DMU object
188  */
189 int __osd_object_attr_get(const struct lu_env *env, struct osd_device *o,
190                           struct osd_object *obj, struct lu_attr *la)
191 {
192         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
193         sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
194         int              cnt = 0;
195         int              rc;
196         ENTRY;
197
198         LASSERT(obj->oo_dn != NULL);
199
200         la->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | LA_TYPE |
201                         LA_SIZE | LA_UID | LA_GID | LA_FLAGS | LA_NLINK;
202
203         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(o), NULL, osa->atime, 16);
204         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(o), NULL, osa->mtime, 16);
205         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(o), NULL, osa->ctime, 16);
206         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(o), NULL, &osa->mode, 8);
207         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(o), NULL, &osa->size, 8);
208         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(o), NULL, &osa->nlink, 8);
209         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(o), NULL, &osa->uid, 8);
210         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(o), NULL, &osa->gid, 8);
211         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(o), NULL, &osa->flags, 8);
212         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
213
214         rc = -sa_bulk_lookup(obj->oo_sa_hdl, bulk, cnt);
215         if (rc)
216                 GOTO(out_sa, rc);
217
218         la->la_atime = osa->atime[0];
219         la->la_mtime = osa->mtime[0];
220         la->la_ctime = osa->ctime[0];
221         la->la_mode = osa->mode;
222         la->la_uid = osa->uid;
223         la->la_gid = osa->gid;
224         la->la_nlink = osa->nlink;
225         la->la_flags = attrs_zfs2fs(osa->flags);
226         la->la_size = osa->size;
227
228         /* Try to get extra flag from LMA. Right now, only LMAI_ORPHAN
229          * flags is stored in LMA, and it is only for orphan directory */
230         if (S_ISDIR(la->la_mode) && dt_object_exists(&obj->oo_dt)) {
231                 struct osd_thread_info *info = osd_oti_get(env);
232                 struct lustre_mdt_attrs *lma;
233                 struct lu_buf buf;
234
235                 lma = (struct lustre_mdt_attrs *)info->oti_buf;
236                 buf.lb_buf = lma;
237                 buf.lb_len = sizeof(info->oti_buf);
238                 rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
239                 if (rc > 0) {
240                         rc = 0;
241                         lma->lma_incompat = le32_to_cpu(lma->lma_incompat);
242                         obj->oo_lma_flags =
243                                 lma_to_lustre_flags(lma->lma_incompat);
244
245                 } else if (rc == -ENODATA) {
246                         rc = 0;
247                 }
248         }
249
250         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
251                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
252                 if (rc)
253                         GOTO(out_sa, rc);
254                 la->la_rdev = osa->rdev;
255                 la->la_valid |= LA_RDEV;
256         }
257 out_sa:
258
259         RETURN(rc);
260 }
261
262 int __osd_obj2dnode(objset_t *os, uint64_t oid, dnode_t **dnp)
263 {
264         dmu_buf_t *db;
265         dmu_buf_impl_t *dbi;
266         int rc;
267
268         rc = -dmu_bonus_hold(os, oid, osd_obj_tag, &db);
269         if (rc)
270                 return rc;
271
272         dbi = (dmu_buf_impl_t *)db;
273         DB_DNODE_ENTER(dbi);
274         *dnp = DB_DNODE(dbi);
275         LASSERT(*dnp != NULL);
276
277         return 0;
278 }
279
280 /*
281  * Concurrency: no concurrent access is possible that early in object
282  * life-cycle.
283  */
284 struct lu_object *osd_object_alloc(const struct lu_env *env,
285                                    const struct lu_object_header *hdr,
286                                    struct lu_device *d)
287 {
288         struct osd_object *mo;
289
290         OBD_SLAB_ALLOC_PTR_GFP(mo, osd_object_kmem, GFP_NOFS);
291         if (mo != NULL) {
292                 struct lu_object *l;
293
294                 l = &mo->oo_dt.do_lu;
295                 dt_object_init(&mo->oo_dt, NULL, d);
296                 mo->oo_dt.do_ops = &osd_obj_ops;
297                 l->lo_ops = &osd_lu_obj_ops;
298                 INIT_LIST_HEAD(&mo->oo_sa_linkage);
299                 INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
300                 init_rwsem(&mo->oo_sem);
301                 init_rwsem(&mo->oo_guard);
302                 rwlock_init(&mo->oo_attr_lock);
303                 mo->oo_destroy = OSD_DESTROY_NONE;
304                 return l;
305         } else {
306                 return NULL;
307         }
308 }
309
310 /*
311  * Concurrency: shouldn't matter.
312  */
313 int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
314 {
315         struct osd_device       *osd = osd_obj2dev(obj);
316         const struct lu_fid     *fid = lu_object_fid(&obj->oo_dt.do_lu);
317         int                      rc = 0;
318         ENTRY;
319
320         if (obj->oo_dn == NULL)
321                 RETURN(0);
322
323         /* object exist */
324
325         rc = osd_object_sa_init(obj, osd);
326         if (rc)
327                 RETURN(rc);
328
329         /* cache attrs in object */
330         rc = __osd_object_attr_get(env, osd, obj, &obj->oo_attr);
331         if (rc)
332                 RETURN(rc);
333
334         if (likely(!fid_is_acct(fid)))
335                 /* no body operations for accounting objects */
336                 obj->oo_dt.do_body_ops = &osd_body_ops;
337
338         /*
339          * initialize object before marking it existing
340          */
341         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
342
343         smp_mb();
344         obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
345
346         RETURN(0);
347 }
348
349 static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
350 {
351         struct osd_thread_info  *info = osd_oti_get(env);
352         struct lu_buf           buf;
353         int                     rc;
354         struct lustre_mdt_attrs *lma;
355         ENTRY;
356
357         CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
358         lma = (struct lustre_mdt_attrs *)info->oti_buf;
359         buf.lb_buf = lma;
360         buf.lb_len = sizeof(info->oti_buf);
361
362         rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
363         if (rc > 0) {
364                 rc = 0;
365                 lustre_lma_swab(lma);
366                 if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
367                              CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
368                         CWARN("%s: unsupported incompat LMA feature(s) %#x for "
369                               "fid = "DFID"\n", osd_obj2dev(obj)->od_svname,
370                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
371                               PFID(lu_object_fid(&obj->oo_dt.do_lu)));
372                         rc = -EOPNOTSUPP;
373                 }
374         } else if (rc == -ENODATA) {
375                 /* haven't initialize LMA xattr */
376                 rc = 0;
377         }
378
379         RETURN(rc);
380 }
381
382 /**
383  * Helper function to retrieve DMU object id from fid for accounting object
384  */
385 static dnode_t *osd_quota_fid2dmu(const struct osd_device *osd,
386                                   const struct lu_fid *fid)
387 {
388         dnode_t *dn = NULL;
389
390         LASSERT(fid_is_acct(fid));
391
392         switch (fid_oid(fid)) {
393         case ACCT_USER_OID:
394                 dn = osd->od_userused_dn;
395                 break;
396         case ACCT_GROUP_OID:
397                 dn = osd->od_groupused_dn;
398                 break;
399         default:
400                 break;
401         }
402
403         return dn;
404 }
405
406 /*
407  * Concurrency: no concurrent access is possible that early in object
408  * life-cycle.
409  */
410 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
411                            const struct lu_object_conf *conf)
412 {
413         struct osd_object *obj = osd_obj(l);
414         struct osd_device *osd = osd_obj2dev(obj);
415         const struct lu_fid *fid = lu_object_fid(l);
416         uint64_t oid;
417         int rc = 0;
418         ENTRY;
419
420         LASSERT(osd_invariant(obj));
421
422         if (fid_is_otable_it(&l->lo_header->loh_fid)) {
423                 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
424                 l->lo_header->loh_attr |= LOHA_EXISTS;
425                 RETURN(0);
426         }
427
428         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
429                 GOTO(out, rc = 0);
430
431         if (unlikely(fid_is_acct(fid))) {
432                 obj->oo_dn = osd_quota_fid2dmu(osd, fid);
433                 if (obj->oo_dn) {
434                         obj->oo_dt.do_index_ops = &osd_acct_index_ops;
435                         l->lo_header->loh_attr |= LOHA_EXISTS;
436                 }
437
438                 GOTO(out, rc = 0);
439         }
440
441         rc = osd_fid_lookup(env, osd, fid, &oid);
442         if (rc == 0) {
443                 LASSERT(obj->oo_dn == NULL);
444                 rc = __osd_obj2dnode(osd->od_os, oid, &obj->oo_dn);
445                 /* EEXIST will be returned if object is being deleted in ZFS */
446                 if (rc == -EEXIST) {
447                         rc = 0;
448                         GOTO(out, rc);
449                 }
450                 if (rc != 0) {
451                         CERROR("%s: lookup "DFID"/%#llx failed: rc = %d\n",
452                                osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
453                         GOTO(out, rc);
454                 }
455                 LASSERT(obj->oo_dn);
456                 rc = osd_object_init0(env, obj);
457                 if (rc != 0)
458                         GOTO(out, rc);
459
460                 rc = osd_check_lma(env, obj);
461                 if (rc != 0)
462                         GOTO(out, rc);
463         } else if (rc == -ENOENT) {
464                 rc = 0;
465         }
466         LASSERT(osd_invariant(obj));
467 out:
468         RETURN(rc);
469 }
470
471 /*
472  * Concurrency: no concurrent access is possible that late in object
473  * life-cycle.
474  */
475 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
476 {
477         struct osd_object *obj = osd_obj(l);
478
479         LASSERT(osd_invariant(obj));
480
481         dt_object_fini(&obj->oo_dt);
482         OBD_SLAB_FREE_PTR(obj, osd_object_kmem);
483 }
484
485 static int
486 osd_object_unlinked_add(struct osd_object *obj, struct osd_thandle *oh)
487 {
488         int rc = -EBUSY;
489
490         LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
491
492         /* the object is supposed to be exclusively locked by
493          * the caller (osd_destroy()), while the transaction
494          * (oh) is per-thread and not shared */
495         if (likely(list_empty(&obj->oo_unlinked_linkage))) {
496                 list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
497                 rc = 0;
498         }
499
500         return rc;
501 }
502
503 /* Default to max data size covered by a level-1 indirect block */
504 static unsigned long osd_sync_destroy_max_size =
505         1UL << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT + SPA_MAXBLOCKSHIFT);
506 module_param(osd_sync_destroy_max_size, ulong, 0444);
507 MODULE_PARM_DESC(osd_sync_destroy_max_size, "Maximum object size to use synchronous destroy.");
508
509 static inline void
510 osd_object_set_destroy_type(struct osd_object *obj)
511 {
512         /*
513          * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
514          * only once and use it consistently thereafter.
515          */
516         down_write(&obj->oo_guard);
517         if (obj->oo_destroy == OSD_DESTROY_NONE) {
518                 if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
519                         obj->oo_destroy = OSD_DESTROY_SYNC;
520                 else /* Larger objects are destroyed asynchronously */
521                         obj->oo_destroy = OSD_DESTROY_ASYNC;
522         }
523         up_write(&obj->oo_guard);
524 }
525
526 static int osd_declare_destroy(const struct lu_env *env, struct dt_object *dt,
527                                struct thandle *th)
528 {
529         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
530         struct osd_object       *obj = osd_dt_obj(dt);
531         struct osd_device       *osd = osd_obj2dev(obj);
532         struct osd_thandle      *oh;
533         dnode_t *dn;
534         int                      rc;
535         uint64_t                 zapid;
536         ENTRY;
537
538         LASSERT(th != NULL);
539         LASSERT(dt_object_exists(dt));
540
541         oh = container_of0(th, struct osd_thandle, ot_super);
542         LASSERT(oh->ot_tx != NULL);
543
544         /* declare that we'll remove object from fid-dnode mapping */
545         zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0, &dn);
546         osd_tx_hold_zap(oh->ot_tx, zapid, dn, FALSE, NULL);
547
548         osd_declare_xattrs_destroy(env, obj, oh);
549
550         /* one less inode */
551         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
552                                obj->oo_attr.la_gid, -1, oh, false, NULL, false);
553         if (rc)
554                 RETURN(rc);
555
556         /* data to be truncated */
557         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
558                                obj->oo_attr.la_gid, 0, oh, true, NULL, false);
559         if (rc)
560                 RETURN(rc);
561
562         osd_object_set_destroy_type(obj);
563         if (obj->oo_destroy == OSD_DESTROY_SYNC)
564                 dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object,
565                                  0, DMU_OBJECT_END);
566         else
567                 osd_tx_hold_zap(oh->ot_tx, osd->od_unlinked->dn_object,
568                                 osd->od_unlinked, TRUE, NULL);
569
570         /* will help to find FID->ino when this object is being
571          * added to PENDING/ */
572         osd_idc_find_and_init(env, osd, obj);
573
574         RETURN(0);
575 }
576
577 static int osd_destroy(const struct lu_env *env, struct dt_object *dt,
578                        struct thandle *th)
579 {
580         struct osd_thread_info  *info = osd_oti_get(env);
581         char                    *buf = info->oti_str;
582         struct osd_object       *obj = osd_dt_obj(dt);
583         struct osd_device       *osd = osd_obj2dev(obj);
584         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
585         struct osd_thandle      *oh;
586         int                      rc;
587         uint64_t                 oid, zapid;
588         dnode_t *zdn;
589         ENTRY;
590
591         down_write(&obj->oo_guard);
592
593         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
594                 GOTO(out, rc = -ENOENT);
595
596         LASSERT(obj->oo_dn != NULL);
597
598         oh = container_of0(th, struct osd_thandle, ot_super);
599         LASSERT(oh != NULL);
600         LASSERT(oh->ot_tx != NULL);
601
602         /* remove obj ref from index dir (it depends) */
603         zapid = osd_get_name_n_idx(env, osd, fid, buf,
604                                    sizeof(info->oti_str), &zdn);
605         rc = osd_zap_remove(osd, zapid, zdn, buf, oh->ot_tx);
606         if (rc) {
607                 CERROR("%s: zap_remove(%s) failed: rc = %d\n",
608                        osd->od_svname, buf, rc);
609                 GOTO(out, rc);
610         }
611
612         rc = osd_xattrs_destroy(env, obj, oh);
613         if (rc) {
614                 CERROR("%s: cannot destroy xattrs for %s: rc = %d\n",
615                        osd->od_svname, buf, rc);
616                 GOTO(out, rc);
617         }
618
619         oid = obj->oo_dn->dn_object;
620         if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
621                 /* this may happen if the destroy wasn't declared
622                  * e.g. when the object is created and then destroyed
623                  * in the same transaction - we don't need additional
624                  * space for destroy specifically */
625                 LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size);
626                 rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
627                 if (rc)
628                         CERROR("%s: failed to free %s %llu: rc = %d\n",
629                                osd->od_svname, buf, oid, rc);
630         } else if (obj->oo_destroy == OSD_DESTROY_SYNC) {
631                 rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
632                 if (rc)
633                         CERROR("%s: failed to free %s %llu: rc = %d\n",
634                                osd->od_svname, buf, oid, rc);
635         } else { /* asynchronous destroy */
636                 char *key = info->oti_key;
637
638                 rc = osd_object_unlinked_add(obj, oh);
639                 if (rc)
640                         GOTO(out, rc);
641
642                 snprintf(key, sizeof(info->oti_key), "%llx", oid);
643                 rc = osd_zap_add(osd, osd->od_unlinked->dn_object,
644                                  osd->od_unlinked, key, 8, 1, &oid, oh->ot_tx);
645                 if (rc)
646                         CERROR("%s: zap_add_int() failed %s %llu: rc = %d\n",
647                                osd->od_svname, buf, oid, rc);
648         }
649
650 out:
651         /* not needed in the cache anymore */
652         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
653         if (rc == 0)
654                 obj->oo_destroyed = 1;
655         up_write(&obj->oo_guard);
656         RETURN (0);
657 }
658
659 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
660 {
661         struct osd_object *obj = osd_obj(l);
662         const struct lu_fid *fid = lu_object_fid(l);
663
664         if (obj->oo_dn) {
665                 if (likely(!fid_is_acct(fid))) {
666                         osd_object_sa_fini(obj);
667                         if (obj->oo_sa_xattr) {
668                                 nvlist_free(obj->oo_sa_xattr);
669                                 obj->oo_sa_xattr = NULL;
670                         }
671                         osd_dnode_rele(obj->oo_dn);
672                         list_del(&obj->oo_sa_linkage);
673                 }
674                 obj->oo_dn = NULL;
675         }
676 }
677
678 /*
679  * Concurrency: ->loo_object_release() is called under site spin-lock.
680  */
681 static void osd_object_release(const struct lu_env *env,
682                                struct lu_object *l)
683 {
684 }
685
686 /*
687  * Concurrency: shouldn't matter.
688  */
689 static int osd_object_print(const struct lu_env *env, void *cookie,
690                             lu_printer_t p, const struct lu_object *l)
691 {
692         struct osd_object *o = osd_obj(l);
693
694         return (*p)(env, cookie, LUSTRE_OSD_ZFS_NAME"-object@%p", o);
695 }
696
697 static void osd_read_lock(const struct lu_env *env, struct dt_object *dt,
698                           unsigned role)
699 {
700         struct osd_object *obj = osd_dt_obj(dt);
701
702         LASSERT(osd_invariant(obj));
703
704         down_read_nested(&obj->oo_sem, role);
705 }
706
707 static void osd_write_lock(const struct lu_env *env, struct dt_object *dt,
708                            unsigned role)
709 {
710         struct osd_object *obj = osd_dt_obj(dt);
711
712         LASSERT(osd_invariant(obj));
713
714         down_write_nested(&obj->oo_sem, role);
715 }
716
717 static void osd_read_unlock(const struct lu_env *env, struct dt_object *dt)
718 {
719         struct osd_object *obj = osd_dt_obj(dt);
720
721         LASSERT(osd_invariant(obj));
722         up_read(&obj->oo_sem);
723 }
724
725 static void osd_write_unlock(const struct lu_env *env, struct dt_object *dt)
726 {
727         struct osd_object *obj = osd_dt_obj(dt);
728
729         LASSERT(osd_invariant(obj));
730         up_write(&obj->oo_sem);
731 }
732
733 static int osd_write_locked(const struct lu_env *env, struct dt_object *dt)
734 {
735         struct osd_object *obj = osd_dt_obj(dt);
736         int rc = 1;
737
738         LASSERT(osd_invariant(obj));
739
740         if (down_write_trylock(&obj->oo_sem)) {
741                 rc = 0;
742                 up_write(&obj->oo_sem);
743         }
744         return rc;
745 }
746
747 static int osd_attr_get(const struct lu_env *env,
748                         struct dt_object *dt,
749                         struct lu_attr *attr)
750 {
751         struct osd_object       *obj = osd_dt_obj(dt);
752         uint64_t                 blocks;
753         uint32_t                 blksize;
754         int                      rc = 0;
755
756         down_read(&obj->oo_guard);
757
758         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
759                 GOTO(out, rc = -ENOENT);
760
761         if (unlikely(fid_is_acct(lu_object_fid(&dt->do_lu))))
762                 GOTO(out, rc = 0);
763
764         LASSERT(osd_invariant(obj));
765         LASSERT(obj->oo_dn);
766
767         read_lock(&obj->oo_attr_lock);
768         *attr = obj->oo_attr;
769         if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
770                 attr->la_flags |= LUSTRE_ORPHAN_FL;
771         read_unlock(&obj->oo_attr_lock);
772
773         /* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
774          * from within sa_object_size() can block on a mutex, so
775          * we can't call sa_object_size() holding rwlock */
776         sa_object_size(obj->oo_sa_hdl, &blksize, &blocks);
777         /* we do not control size of indices, so always calculate
778          * it from number of blocks reported by DMU */
779         if (S_ISDIR(attr->la_mode))
780                 attr->la_size = 512 * blocks;
781         /* Block size may be not set; suggest maximal I/O transfers. */
782         if (blksize == 0)
783                 blksize = osd_spa_maxblocksize(
784                         dmu_objset_spa(osd_obj2dev(obj)->od_os));
785
786         attr->la_blksize = blksize;
787         attr->la_blocks = blocks;
788         attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
789
790 out:
791         up_read(&obj->oo_guard);
792         return rc;
793 }
794
795 /* Simple wrapper on top of qsd API which implement quota transfer for osd
796  * setattr needs. As a reminder, only the root user can change ownership of
797  * a file, that's why EDQUOT & EINPROGRESS errors are discarded */
798 static inline int qsd_transfer(const struct lu_env *env,
799                                struct qsd_instance *qsd,
800                                struct lquota_trans *trans, int qtype,
801                                __u64 orig_id, __u64 new_id, __u64 bspace,
802                                struct lquota_id_info *qi)
803 {
804         int     rc;
805
806         if (unlikely(qsd == NULL))
807                 return 0;
808
809         LASSERT(qtype >= 0 && qtype < LL_MAXQUOTAS);
810         qi->lqi_type = qtype;
811
812         /* inode accounting */
813         qi->lqi_is_blk = false;
814
815         /* one more inode for the new owner ... */
816         qi->lqi_id.qid_uid = new_id;
817         qi->lqi_space      = 1;
818         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
819         if (rc == -EDQUOT || rc == -EINPROGRESS)
820                 rc = 0;
821         if (rc)
822                 return rc;
823
824         /* and one less inode for the current id */
825         qi->lqi_id.qid_uid = orig_id;;
826         qi->lqi_space      = -1;
827         /* can't get EDQUOT when reducing usage */
828         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
829         if (rc == -EINPROGRESS)
830                 rc = 0;
831         if (rc)
832                 return rc;
833
834         /* block accounting */
835         qi->lqi_is_blk = true;
836
837         /* more blocks for the new owner ... */
838         qi->lqi_id.qid_uid = new_id;
839         qi->lqi_space      = bspace;
840         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
841         if (rc == -EDQUOT || rc == -EINPROGRESS)
842                 rc = 0;
843         if (rc)
844                 return rc;
845
846         /* and finally less blocks for the current owner */
847         qi->lqi_id.qid_uid = orig_id;
848         qi->lqi_space      = -bspace;
849         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
850         /* can't get EDQUOT when reducing usage */
851         if (rc == -EINPROGRESS)
852                 rc = 0;
853         return rc;
854 }
855
856 static int osd_declare_attr_set(const struct lu_env *env,
857                                 struct dt_object *dt,
858                                 const struct lu_attr *attr,
859                                 struct thandle *handle)
860 {
861         struct osd_thread_info  *info = osd_oti_get(env);
862         struct osd_object       *obj = osd_dt_obj(dt);
863         struct osd_device       *osd = osd_obj2dev(obj);
864         dmu_tx_hold_t           *txh;
865         struct osd_thandle      *oh;
866         uint64_t                 bspace;
867         uint32_t                 blksize;
868         int                      rc = 0;
869         bool                     found;
870         ENTRY;
871
872
873         LASSERT(handle != NULL);
874         LASSERT(osd_invariant(obj));
875
876         oh = container_of0(handle, struct osd_thandle, ot_super);
877
878         down_read(&obj->oo_guard);
879         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
880                 GOTO(out, rc = 0);
881
882         LASSERT(obj->oo_sa_hdl != NULL);
883         LASSERT(oh->ot_tx != NULL);
884         /* regular attributes are part of the bonus buffer */
885         /* let's check whether this object is already part of
886          * transaction.. */
887         found = false;
888         for (txh = list_head(&oh->ot_tx->tx_holds); txh;
889              txh = list_next(&oh->ot_tx->tx_holds, txh)) {
890                 if (txh->txh_dnode == NULL)
891                         continue;
892                 if (txh->txh_dnode->dn_object != obj->oo_dn->dn_object)
893                         continue;
894                 /* this object is part of the transaction already
895                  * we don't need to declare bonus again */
896                 found = true;
897                 break;
898         }
899         if (!found)
900                 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_dn->dn_object);
901         if (oh->ot_tx->tx_err != 0)
902                 GOTO(out, rc = -oh->ot_tx->tx_err);
903
904         if (attr && attr->la_valid & LA_FLAGS) {
905                 /* LMA is usually a part of bonus, no need to declare
906                  * anything else */
907         }
908
909         if (attr && (attr->la_valid & (LA_UID | LA_GID))) {
910                 sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
911                 bspace = toqb(bspace * blksize);
912         }
913
914         if (attr && attr->la_valid & LA_UID) {
915                 /* quota enforcement for user */
916                 if (attr->la_uid != obj->oo_attr.la_uid) {
917                         rc = qsd_transfer(env, osd->od_quota_slave,
918                                           &oh->ot_quota_trans, USRQUOTA,
919                                           obj->oo_attr.la_uid, attr->la_uid,
920                                           bspace, &info->oti_qi);
921                         if (rc)
922                                 GOTO(out, rc);
923                 }
924         }
925         if (attr && attr->la_valid & LA_GID) {
926                 /* quota enforcement for group */
927                 if (attr->la_gid != obj->oo_attr.la_gid) {
928                         rc = qsd_transfer(env, osd->od_quota_slave,
929                                           &oh->ot_quota_trans, GRPQUOTA,
930                                           obj->oo_attr.la_gid, attr->la_gid,
931                                           bspace, &info->oti_qi);
932                         if (rc)
933                                 GOTO(out, rc);
934                 }
935         }
936
937 out:
938         up_read(&obj->oo_guard);
939         RETURN(rc);
940 }
941
942 /*
943  * Set the attributes of an object
944  *
945  * The transaction passed to this routine must have
946  * dmu_tx_hold_bonus(tx, oid) called and then assigned
947  * to a transaction group.
948  */
949 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
950                         const struct lu_attr *la, struct thandle *handle)
951 {
952         struct osd_thread_info  *info = osd_oti_get(env);
953         sa_bulk_attr_t          *bulk = osd_oti_get(env)->oti_attr_bulk;
954         struct osd_object       *obj = osd_dt_obj(dt);
955         struct osd_device       *osd = osd_obj2dev(obj);
956         struct osd_thandle      *oh;
957         struct osa_attr         *osa = &info->oti_osa;
958         __u64                    valid = la->la_valid;
959         int                      cnt;
960         int                      rc = 0;
961
962         ENTRY;
963
964         down_read(&obj->oo_guard);
965         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
966                 GOTO(out, rc = -ENOENT);
967
968         LASSERT(handle != NULL);
969         LASSERT(osd_invariant(obj));
970         LASSERT(obj->oo_sa_hdl);
971
972         oh = container_of0(handle, struct osd_thandle, ot_super);
973         /* Assert that the transaction has been assigned to a
974            transaction group. */
975         LASSERT(oh->ot_tx->tx_txg != 0);
976
977         /* Only allow set size for regular file */
978         if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
979                 valid &= ~(LA_SIZE | LA_BLOCKS);
980
981         if (valid & LA_CTIME && la->la_ctime == obj->oo_attr.la_ctime)
982                 valid &= ~LA_CTIME;
983
984         if (valid & LA_MTIME && la->la_mtime == obj->oo_attr.la_mtime)
985                 valid &= ~LA_MTIME;
986
987         if (valid & LA_ATIME && la->la_atime == obj->oo_attr.la_atime)
988                 valid &= ~LA_ATIME;
989
990         if (valid == 0)
991                 GOTO(out, rc = 0);
992
993         if (valid & LA_FLAGS) {
994                 struct lustre_mdt_attrs *lma;
995                 struct lu_buf buf;
996
997                 if (la->la_flags & LUSTRE_LMA_FL_MASKS) {
998                         CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
999                         lma = (struct lustre_mdt_attrs *)&info->oti_buf;
1000                         buf.lb_buf = lma;
1001                         buf.lb_len = sizeof(info->oti_buf);
1002                         rc = osd_xattr_get(env, &obj->oo_dt, &buf,
1003                                            XATTR_NAME_LMA);
1004                         if (rc > 0) {
1005                                 lma->lma_incompat =
1006                                         le32_to_cpu(lma->lma_incompat);
1007                                 lma->lma_incompat |=
1008                                         lustre_to_lma_flags(la->la_flags);
1009                                 lma->lma_incompat =
1010                                         cpu_to_le32(lma->lma_incompat);
1011                                 buf.lb_buf = lma;
1012                                 buf.lb_len = sizeof(*lma);
1013                                 rc = osd_xattr_set_internal(env, obj, &buf,
1014                                                             XATTR_NAME_LMA,
1015                                                             LU_XATTR_REPLACE,
1016                                                             oh);
1017                         }
1018                         if (rc < 0) {
1019                                 CWARN("%s: failed to set LMA flags: rc = %d\n",
1020                                        osd->od_svname, rc);
1021                                 RETURN(rc);
1022                         }
1023                 }
1024         }
1025
1026         write_lock(&obj->oo_attr_lock);
1027         cnt = 0;
1028         if (valid & LA_ATIME) {
1029                 osa->atime[0] = obj->oo_attr.la_atime = la->la_atime;
1030                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL,
1031                                  osa->atime, 16);
1032         }
1033         if (valid & LA_MTIME) {
1034                 osa->mtime[0] = obj->oo_attr.la_mtime = la->la_mtime;
1035                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL,
1036                                  osa->mtime, 16);
1037         }
1038         if (valid & LA_CTIME) {
1039                 osa->ctime[0] = obj->oo_attr.la_ctime = la->la_ctime;
1040                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL,
1041                                  osa->ctime, 16);
1042         }
1043         if (valid & LA_MODE) {
1044                 /* mode is stored along with type, so read it first */
1045                 obj->oo_attr.la_mode = (obj->oo_attr.la_mode & S_IFMT) |
1046                         (la->la_mode & ~S_IFMT);
1047                 osa->mode = obj->oo_attr.la_mode;
1048                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL,
1049                                  &osa->mode, 8);
1050         }
1051         if (valid & LA_SIZE) {
1052                 osa->size = obj->oo_attr.la_size = la->la_size;
1053                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL,
1054                                  &osa->size, 8);
1055         }
1056         if (valid & LA_NLINK) {
1057                 osa->nlink = obj->oo_attr.la_nlink = la->la_nlink;
1058                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL,
1059                                  &osa->nlink, 8);
1060         }
1061         if (valid & LA_RDEV) {
1062                 osa->rdev = obj->oo_attr.la_rdev = la->la_rdev;
1063                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL,
1064                                  &osa->rdev, 8);
1065         }
1066         if (valid & LA_FLAGS) {
1067                 osa->flags = attrs_fs2zfs(la->la_flags);
1068                 /* many flags are not supported by zfs, so ensure a good cached
1069                  * copy */
1070                 obj->oo_attr.la_flags = attrs_zfs2fs(osa->flags);
1071                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL,
1072                                  &osa->flags, 8);
1073         }
1074         if (valid & LA_UID) {
1075                 osa->uid = obj->oo_attr.la_uid = la->la_uid;
1076                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL,
1077                                  &osa->uid, 8);
1078         }
1079         if (valid & LA_GID) {
1080                 osa->gid = obj->oo_attr.la_gid = la->la_gid;
1081                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL,
1082                                  &osa->gid, 8);
1083         }
1084         obj->oo_attr.la_valid |= valid;
1085         write_unlock(&obj->oo_attr_lock);
1086
1087         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
1088         rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
1089
1090 out:
1091         up_read(&obj->oo_guard);
1092         RETURN(rc);
1093 }
1094
1095 /*
1096  * Object creation.
1097  *
1098  * XXX temporary solution.
1099  */
1100
1101 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1102                         struct dt_object *parent, struct dt_object *child,
1103                         umode_t child_mode)
1104 {
1105         LASSERT(ah);
1106
1107         ah->dah_parent = parent;
1108         ah->dah_mode = child_mode;
1109
1110         if (parent != NULL && !dt_object_remote(parent)) {
1111                 /* will help to find FID->ino at dt_insert("..") */
1112                 struct osd_object *pobj = osd_dt_obj(parent);
1113
1114                 osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
1115         }
1116 }
1117
1118 static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
1119                               struct lu_attr *attr,
1120                               struct dt_allocation_hint *hint,
1121                               struct dt_object_format *dof,
1122                               struct thandle *handle)
1123 {
1124         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1125         struct osd_object       *obj = osd_dt_obj(dt);
1126         struct osd_device       *osd = osd_obj2dev(obj);
1127         struct osd_thandle      *oh;
1128         uint64_t                 zapid;
1129         dnode_t                 *dn;
1130         int                      rc, dnode_size;
1131         ENTRY;
1132
1133         LASSERT(dof);
1134
1135         switch (dof->dof_type) {
1136                 case DFT_REGULAR:
1137                 case DFT_SYM:
1138                 case DFT_NODE:
1139                         if (obj->oo_dt.do_body_ops == NULL)
1140                                 obj->oo_dt.do_body_ops = &osd_body_ops;
1141                         break;
1142                 default:
1143                         break;
1144         }
1145
1146         LASSERT(handle != NULL);
1147         oh = container_of0(handle, struct osd_thandle, ot_super);
1148         LASSERT(oh->ot_tx != NULL);
1149
1150         /* this is the minimum set of EAs on every Lustre object */
1151         obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
1152                                 sizeof(__u64) + /* VBR VERSION */
1153                                 sizeof(struct lustre_mdt_attrs); /* LMA */
1154         /* reserve 32 bytes for extra stuff like ACLs */
1155         dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
1156
1157         switch (dof->dof_type) {
1158                 case DFT_DIR:
1159                         dt->do_index_ops = &osd_dir_ops;
1160                 case DFT_INDEX:
1161                         /* for zap create */
1162                         dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL);
1163                         dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
1164                         break;
1165                 case DFT_REGULAR:
1166                 case DFT_SYM:
1167                 case DFT_NODE:
1168                         /* first, we'll create new object */
1169                         dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
1170                         break;
1171
1172                 default:
1173                         LBUG();
1174                         break;
1175         }
1176
1177         /* and we'll add it to some mapping */
1178         zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0, &dn);
1179         osd_tx_hold_zap(oh->ot_tx, zapid, dn, TRUE, NULL);
1180
1181         /* will help to find FID->ino mapping at dt_insert() */
1182         osd_idc_find_and_init(env, osd, obj);
1183
1184         rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
1185                                false, NULL, false);
1186
1187         RETURN(rc);
1188 }
1189
1190 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
1191                     sa_handle_t *sa_hdl, dmu_tx_t *tx,
1192                     struct lu_attr *la, uint64_t parent,
1193                     nvlist_t *xattr)
1194 {
1195         sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
1196         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
1197         uint64_t         gen;
1198         uint64_t         crtime[2];
1199         timestruc_t      now;
1200         int              cnt;
1201         int              rc;
1202         char *dxattr = NULL;
1203         size_t sa_size;
1204
1205
1206         LASSERT(sa_hdl);
1207
1208         gen = dmu_tx_get_txg(tx);
1209         gethrestime(&now);
1210         ZFS_TIME_ENCODE(&now, crtime);
1211
1212         osa->atime[0] = la->la_atime;
1213         osa->ctime[0] = la->la_ctime;
1214         osa->mtime[0] = la->la_mtime;
1215         osa->mode = la->la_mode;
1216         osa->uid = la->la_uid;
1217         osa->gid = la->la_gid;
1218         osa->rdev = la->la_rdev;
1219         osa->nlink = la->la_nlink;
1220         osa->flags = attrs_fs2zfs(la->la_flags);
1221         osa->size  = la->la_size;
1222
1223         /*
1224          * we need to create all SA below upon object create.
1225          *
1226          * XXX The attribute order matters since the accounting callback relies
1227          * on static offsets (i.e. SA_*_OFFSET, see zfs_space_delta_cb()) to
1228          * look up the UID/GID attributes. Moreover, the callback does not seem
1229          * to support the spill block.
1230          * We define attributes in the same order as SA_*_OFFSET in order to
1231          * work around the problem. See ORI-610.
1232          */
1233         cnt = 0;
1234         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8);
1235         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8);
1236         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8);
1237         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8);
1238         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8);
1239         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL, &parent, 8);
1240         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8);
1241         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16);
1242         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16);
1243         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16);
1244         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, crtime, 16);
1245         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8);
1246         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
1247         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
1248
1249         if (xattr) {
1250                 rc = -nvlist_size(xattr, &sa_size, NV_ENCODE_XDR);
1251                 LASSERT(rc == 0);
1252
1253                 dxattr = osd_zio_buf_alloc(sa_size);
1254                 LASSERT(dxattr);
1255
1256                 rc = -nvlist_pack(xattr, &dxattr, &sa_size,
1257                                 NV_ENCODE_XDR, KM_SLEEP);
1258                 LASSERT(rc == 0);
1259
1260                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd),
1261                                 NULL, dxattr, sa_size);
1262         }
1263
1264         rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
1265         if (dxattr)
1266                 osd_zio_buf_free(dxattr, sa_size);
1267
1268         return rc;
1269 }
1270
1271 static int osd_find_new_dnode(const struct lu_env *env, dmu_tx_t *tx,
1272                               uint64_t oid, dnode_t **dnp)
1273 {
1274         dmu_tx_hold_t *txh;
1275         int rc = 0;
1276
1277         /* take dnode_t from tx to save on dnode#->dnode_t lookup */
1278         for (txh = list_tail(&tx->tx_holds); txh;
1279              txh = list_prev(&tx->tx_holds, txh)) {
1280                 dnode_t *dn = txh->txh_dnode;
1281                 dmu_buf_impl_t *db;
1282
1283                 if (dn == NULL)
1284                         continue;
1285                 if (dn->dn_object != oid)
1286                         continue;
1287                 db = dn->dn_bonus;
1288                 if (db == NULL) {
1289                         rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
1290                         if (dn->dn_bonus == NULL)
1291                                 dbuf_create_bonus(dn);
1292                         rw_exit(&dn->dn_struct_rwlock);
1293                 }
1294                 db = dn->dn_bonus;
1295                 LASSERT(db);
1296                 LASSERT(dn->dn_handle);
1297                 DB_DNODE_ENTER(db);
1298                 if (refcount_add(&db->db_holds, osd_obj_tag) == 1) {
1299                         refcount_add(&dn->dn_holds, tag);
1300                         atomic_inc_32(&dn->dn_dbufs_count);
1301                 }
1302                 *dnp = dn;
1303                 dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH);
1304                 break;
1305         }
1306
1307         if (unlikely(*dnp == NULL))
1308                 rc = __osd_obj2dnode(tx->tx_objset, oid, dnp);
1309
1310         return rc;
1311 }
1312
1313 /*
1314  * The transaction passed to this routine must have
1315  * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
1316  * to a transaction group.
1317  */
1318 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
1319                         dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
1320 {
1321         struct osd_device   *osd = osd_obj2dev(obj);
1322         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
1323         dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
1324         uint64_t oid;
1325
1326         /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
1327          * would get an additional ditto copy */
1328         if (unlikely(S_ISREG(la->la_mode) &&
1329                      fid_seq_is_local_file(fid_seq(fid))))
1330                 type = DMU_OTN_UINT8_METADATA;
1331
1332         /* Create a new DMU object using the default dnode size. */
1333         oid = osd_dmu_object_alloc(osd->od_os, type, 0, 0, tx);
1334
1335         LASSERT(la->la_valid & LA_MODE);
1336         la->la_size = 0;
1337         la->la_nlink = 1;
1338
1339         return osd_find_new_dnode(env, tx, oid, dnp);
1340 }
1341
1342 /*
1343  * The transaction passed to this routine must have
1344  * dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, ...) called and then assigned
1345  * to a transaction group.
1346  *
1347  * Using ZAP_FLAG_HASH64 will force the ZAP to always be a FAT ZAP.
1348  * This is fine for directories today, because storing the FID in the dirent
1349  * will also require a FAT ZAP.  If there is a new type of micro ZAP created
1350  * then we might need to re-evaluate the use of this flag and instead do
1351  * a conversion from the different internal ZAP hash formats being used. */
1352 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
1353                      dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la,
1354                      zap_flags_t flags)
1355 {
1356         uint64_t oid;
1357
1358         /* Assert that the transaction has been assigned to a
1359            transaction group. */
1360         LASSERT(tx->tx_txg != 0);
1361         *dnp = NULL;
1362
1363         oid = osd_zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
1364                                    DMU_OT_DIRECTORY_CONTENTS,
1365                                    14, /* == ZFS fzap_default_blockshift */
1366                                    DN_MAX_INDBLKSHIFT, /* indirect blockshift */
1367                                    0, tx);
1368
1369         la->la_size = 2;
1370         la->la_nlink = 1;
1371
1372         return osd_find_new_dnode(env, tx, oid, dnp);
1373 }
1374
1375 static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
1376                           struct lu_attr *la, struct osd_thandle *oh)
1377 {
1378         dnode_t *dn;
1379         int rc;
1380
1381         /* Index file should be created as regular file in order not to confuse
1382          * ZPL which could interpret them as directory.
1383          * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
1384          * binary keys */
1385         LASSERT(S_ISREG(la->la_mode));
1386         rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
1387                               ZAP_FLAG_UINT64_KEY);
1388         if (rc)
1389                 return ERR_PTR(rc);
1390         return dn;
1391 }
1392
1393 static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
1394                           struct lu_attr *la, struct osd_thandle *oh)
1395 {
1396         dnode_t *dn;
1397         int rc;
1398
1399         LASSERT(S_ISDIR(la->la_mode));
1400         rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la, 0);
1401         if (rc)
1402                 return ERR_PTR(rc);
1403         return dn;
1404 }
1405
1406 static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
1407                           struct lu_attr *la, struct osd_thandle *oh)
1408 {
1409         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
1410         struct osd_device *osd = osd_obj2dev(obj);
1411         dnode_t *dn;
1412         int rc;
1413
1414         LASSERT(S_ISREG(la->la_mode));
1415         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1416         if (rc)
1417                 return ERR_PTR(rc);
1418
1419         if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) &&
1420             osd->od_is_ost) {
1421                 /* The minimum block size must be at least page size otherwise
1422                  * it will break the assumption in tgt_thread_big_cache where
1423                  * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
1424                  * RDMA due to subpage transfer size */
1425                 rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object,
1426                                                PAGE_SIZE, 0, oh->ot_tx);
1427                 if (unlikely(rc)) {
1428                         CERROR("%s: can't change blocksize: %d\n",
1429                                osd->od_svname, rc);
1430                         return ERR_PTR(rc);
1431                 }
1432         }
1433
1434         return dn;
1435 }
1436
1437 static dnode_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
1438                           struct lu_attr *la, struct osd_thandle *oh)
1439 {
1440         dnode_t *dn;
1441         int rc;
1442
1443         LASSERT(S_ISLNK(la->la_mode));
1444         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1445         if (rc)
1446                 return ERR_PTR(rc);
1447         return dn;
1448 }
1449
1450 static dnode_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
1451                           struct lu_attr *la, struct osd_thandle *oh)
1452 {
1453         dnode_t *dn;
1454         int rc;
1455
1456         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
1457                 la->la_valid |= LA_RDEV;
1458
1459         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1460         if (rc)
1461                 return ERR_PTR(rc);
1462         return dn;
1463 }
1464
1465 typedef dnode_t *(*osd_obj_type_f)(const struct lu_env *env,
1466                                    struct osd_object *obj,
1467                                    struct lu_attr *la,
1468                                    struct osd_thandle *oh);
1469
1470 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1471 {
1472         osd_obj_type_f result;
1473
1474         switch (type) {
1475         case DFT_DIR:
1476                 result = osd_mkdir;
1477                 break;
1478         case DFT_INDEX:
1479                 result = osd_mkidx;
1480                 break;
1481         case DFT_REGULAR:
1482                 result = osd_mkreg;
1483                 break;
1484         case DFT_SYM:
1485                 result = osd_mksym;
1486                 break;
1487         case DFT_NODE:
1488                 result = osd_mknod;
1489                 break;
1490         default:
1491                 LBUG();
1492                 break;
1493         }
1494         return result;
1495 }
1496
1497 /*
1498  * Concurrency: @dt is write locked.
1499  */
1500 static int osd_create(const struct lu_env *env, struct dt_object *dt,
1501                       struct lu_attr *attr, struct dt_allocation_hint *hint,
1502                       struct dt_object_format *dof, struct thandle *th)
1503 {
1504         struct osd_thread_info  *info = osd_oti_get(env);
1505         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
1506         struct zpl_direntry     *zde = &info->oti_zde.lzd_reg;
1507         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1508         struct osd_object       *obj = osd_dt_obj(dt);
1509         struct osd_device       *osd = osd_obj2dev(obj);
1510         char                    *buf = info->oti_str;
1511         struct osd_thandle      *oh;
1512         dnode_t *dn = NULL, *zdn = NULL;
1513         uint64_t                 zapid, parent = 0;
1514         int                      rc;
1515
1516         ENTRY;
1517
1518         LASSERT(!fid_is_acct(fid));
1519
1520         /* concurrent create declarations should not see
1521          * the object inconsistent (db, attr, etc).
1522          * in regular cases acquisition should be cheap */
1523         down_write(&obj->oo_guard);
1524
1525         if (unlikely(dt_object_exists(dt)))
1526                 GOTO(out, rc = -EEXIST);
1527
1528         LASSERT(osd_invariant(obj));
1529         LASSERT(dof != NULL);
1530
1531         LASSERT(th != NULL);
1532         oh = container_of0(th, struct osd_thandle, ot_super);
1533
1534         LASSERT(obj->oo_dn == NULL);
1535
1536         /* to follow ZFS on-disk format we need
1537          * to initialize parent dnode properly */
1538         if (hint != NULL && hint->dah_parent != NULL &&
1539             !dt_object_remote(hint->dah_parent))
1540                 parent = osd_dt_obj(hint->dah_parent)->oo_dn->dn_object;
1541
1542         /* we may fix some attributes, better do not change the source */
1543         obj->oo_attr = *attr;
1544         obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
1545
1546         dn = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
1547         if (IS_ERR(dn)) {
1548                 rc = PTR_ERR(dn);
1549                 dn = NULL;
1550                 GOTO(out, rc);
1551         }
1552
1553         zde->zde_pad = 0;
1554         zde->zde_dnode = dn->dn_object;
1555         zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
1556
1557         zapid = osd_get_name_n_idx(env, osd, fid, buf,
1558                                    sizeof(info->oti_str), &zdn);
1559         rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1, zde, oh->ot_tx);
1560         if (rc)
1561                 GOTO(out, rc);
1562         obj->oo_dn = dn;
1563         /* Now add in all of the "SA" attributes */
1564         rc = osd_sa_handle_get(obj);
1565         if (rc)
1566                 GOTO(out, rc);
1567
1568         rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
1569         if (rc)
1570                 GOTO(out, rc);
1571
1572         /* initialize LMA */
1573         lustre_lma_init(lma, fid, 0, 0);
1574         lustre_lma_swab(lma);
1575         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA,
1576                                     (uchar_t *)lma, sizeof(*lma));
1577         if (rc)
1578                 GOTO(out, rc);
1579
1580         /* configure new osd object */
1581         obj->oo_parent = parent != 0 ? parent : zapid;
1582         obj->oo_late_attr_set = 1;
1583         rc = __osd_sa_xattr_schedule_update(env, obj, oh);
1584         if (rc)
1585                 GOTO(out, rc);
1586
1587         /* XXX: oo_lma_flags */
1588         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
1589         if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu))))
1590                 /* no body operations for accounting objects */
1591                 obj->oo_dt.do_body_ops = &osd_body_ops;
1592
1593         osd_idc_find_and_init(env, osd, obj);
1594
1595 out:
1596         if (unlikely(rc && dn)) {
1597                 dmu_object_free(osd->od_os, dn->dn_object, oh->ot_tx);
1598                 osd_dnode_rele(dn);
1599                 obj->oo_dn = NULL;
1600         } else if (!rc) {
1601                 obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
1602         }
1603         up_write(&obj->oo_guard);
1604         RETURN(rc);
1605 }
1606
1607 static int osd_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
1608                                struct thandle *th)
1609 {
1610         return osd_declare_attr_set(env, dt, NULL, th);
1611 }
1612
1613 /*
1614  * Concurrency: @dt is write locked.
1615  */
1616 static int osd_ref_add(const struct lu_env *env, struct dt_object *dt,
1617                        struct thandle *handle)
1618 {
1619         struct osd_object       *obj = osd_dt_obj(dt);
1620         struct osd_thandle      *oh;
1621         struct osd_device       *osd = osd_obj2dev(obj);
1622         uint64_t                 nlink;
1623         int rc;
1624
1625         ENTRY;
1626
1627         down_read(&obj->oo_guard);
1628         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1629                 GOTO(out, rc = -ENOENT);
1630
1631         LASSERT(osd_invariant(obj));
1632         LASSERT(obj->oo_sa_hdl != NULL);
1633
1634         oh = container_of0(handle, struct osd_thandle, ot_super);
1635
1636         write_lock(&obj->oo_attr_lock);
1637         nlink = ++obj->oo_attr.la_nlink;
1638         write_unlock(&obj->oo_attr_lock);
1639
1640         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1641
1642 out:
1643         up_read(&obj->oo_guard);
1644         RETURN(rc);
1645 }
1646
1647 static int osd_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
1648                                struct thandle *handle)
1649 {
1650         return osd_declare_attr_set(env, dt, NULL, handle);
1651 }
1652
1653 /*
1654  * Concurrency: @dt is write locked.
1655  */
1656 static int osd_ref_del(const struct lu_env *env, struct dt_object *dt,
1657                        struct thandle *handle)
1658 {
1659         struct osd_object       *obj = osd_dt_obj(dt);
1660         struct osd_thandle      *oh;
1661         struct osd_device       *osd = osd_obj2dev(obj);
1662         uint64_t                 nlink;
1663         int                      rc;
1664
1665         ENTRY;
1666
1667         down_read(&obj->oo_guard);
1668
1669         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1670                 GOTO(out, rc = -ENOENT);
1671
1672         LASSERT(osd_invariant(obj));
1673         LASSERT(obj->oo_sa_hdl != NULL);
1674
1675         oh = container_of0(handle, struct osd_thandle, ot_super);
1676         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1677
1678         write_lock(&obj->oo_attr_lock);
1679         nlink = --obj->oo_attr.la_nlink;
1680         write_unlock(&obj->oo_attr_lock);
1681
1682         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1683
1684 out:
1685         up_read(&obj->oo_guard);
1686         RETURN(rc);
1687 }
1688
1689 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
1690                            __u64 start, __u64 end)
1691 {
1692         struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
1693         ENTRY;
1694
1695         /* XXX: no other option than syncing the whole filesystem until we
1696          * support ZIL.  If the object tracked the txg that it was last
1697          * modified in, it could pass that txg here instead of "0".  Maybe
1698          * the changes are already committed, so no wait is needed at all? */
1699         if (!osd->od_dt_dev.dd_rdonly)
1700                 txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
1701
1702         RETURN(0);
1703 }
1704
1705 static int osd_invalidate(const struct lu_env *env, struct dt_object *dt)
1706 {
1707         return 0;
1708 }
1709
1710 static struct dt_object_operations osd_obj_ops = {
1711         .do_read_lock           = osd_read_lock,
1712         .do_write_lock          = osd_write_lock,
1713         .do_read_unlock         = osd_read_unlock,
1714         .do_write_unlock        = osd_write_unlock,
1715         .do_write_locked        = osd_write_locked,
1716         .do_attr_get            = osd_attr_get,
1717         .do_declare_attr_set    = osd_declare_attr_set,
1718         .do_attr_set            = osd_attr_set,
1719         .do_ah_init             = osd_ah_init,
1720         .do_declare_create      = osd_declare_create,
1721         .do_create              = osd_create,
1722         .do_declare_destroy     = osd_declare_destroy,
1723         .do_destroy             = osd_destroy,
1724         .do_index_try           = osd_index_try,
1725         .do_declare_ref_add     = osd_declare_ref_add,
1726         .do_ref_add             = osd_ref_add,
1727         .do_declare_ref_del     = osd_declare_ref_del,
1728         .do_ref_del             = osd_ref_del,
1729         .do_xattr_get           = osd_xattr_get,
1730         .do_declare_xattr_set   = osd_declare_xattr_set,
1731         .do_xattr_set           = osd_xattr_set,
1732         .do_declare_xattr_del   = osd_declare_xattr_del,
1733         .do_xattr_del           = osd_xattr_del,
1734         .do_xattr_list          = osd_xattr_list,
1735         .do_object_sync         = osd_object_sync,
1736         .do_invalidate          = osd_invalidate,
1737 };
1738
1739 static struct lu_object_operations osd_lu_obj_ops = {
1740         .loo_object_init        = osd_object_init,
1741         .loo_object_delete      = osd_object_delete,
1742         .loo_object_release     = osd_object_release,
1743         .loo_object_free        = osd_object_free,
1744         .loo_object_print       = osd_object_print,
1745         .loo_object_invariant   = osd_object_invariant,
1746 };
1747
1748 static int osd_otable_it_attr_get(const struct lu_env *env,
1749                                 struct dt_object *dt,
1750                                 struct lu_attr *attr)
1751 {
1752         attr->la_valid = 0;
1753         return 0;
1754 }
1755
1756 static struct dt_object_operations osd_obj_otable_it_ops = {
1757         .do_attr_get            = osd_otable_it_attr_get,
1758         .do_index_try           = osd_index_try,
1759 };