Whamcloud - gitweb
LU-7991 quota: project quota against ZFS backend
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd-zfs/osd_object.c
33  *
34  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
35  * Author: Mike Pershin <tappro@whamcloud.com>
36  * Author: Johann Lombardi <johann@whamcloud.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_OSD
40
41 #include <libcfs/libcfs.h>
42 #include <obd_support.h>
43 #include <lustre_net.h>
44 #include <obd.h>
45 #include <obd_class.h>
46 #include <lustre_disk.h>
47 #include <lustre_fid.h>
48
49 #include "osd_internal.h"
50
51 #include <sys/dnode.h>
52 #include <sys/dbuf.h>
53 #include <sys/spa.h>
54 #include <sys/stat.h>
55 #include <sys/zap.h>
56 #include <sys/spa_impl.h>
57 #include <sys/zfs_znode.h>
58 #include <sys/dmu_tx.h>
59 #include <sys/dmu_objset.h>
60 #include <sys/dsl_prop.h>
61 #include <sys/sa_impl.h>
62 #include <sys/txg.h>
63
64 char *osd_obj_tag = "osd_object";
65
66 static struct dt_object_operations osd_obj_ops;
67 static struct lu_object_operations osd_lu_obj_ops;
68 extern struct dt_body_operations osd_body_ops;
69 static struct dt_object_operations osd_obj_otable_it_ops;
70
71 extern struct kmem_cache *osd_object_kmem;
72
73 static void
74 osd_object_sa_fini(struct osd_object *obj)
75 {
76         if (obj->oo_sa_hdl) {
77                 sa_handle_destroy(obj->oo_sa_hdl);
78                 obj->oo_sa_hdl = NULL;
79         }
80 }
81
82 static int
83 osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
84 {
85         int rc;
86
87         LASSERT(obj->oo_sa_hdl == NULL);
88         LASSERT(obj->oo_dn != NULL);
89
90         rc = osd_sa_handle_get(obj);
91         if (rc)
92                 return rc;
93
94         /* Cache the xattr object id, valid for the life of the object */
95         rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_XATTR(o), &obj->oo_xattr, 8);
96         if (rc == -ENOENT) {
97                 obj->oo_xattr = ZFS_NO_OBJECT;
98                 rc = 0;
99         } else if (rc) {
100                 osd_object_sa_fini(obj);
101         }
102
103         return rc;
104 }
105
106 /*
107  * Add object to list of dirty objects in tx handle.
108  */
109 void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
110 {
111         if (!list_empty(&obj->oo_sa_linkage))
112                 return;
113
114         write_lock(&obj->oo_attr_lock);
115         if (likely(list_empty(&obj->oo_sa_linkage)))
116                 list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
117         write_unlock(&obj->oo_attr_lock);
118 }
119
120 /*
121  * Release spill block dbuf hold for all dirty SAs.
122  */
123 void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
124 {
125         struct osd_object *obj;
126
127         while (!list_empty(&oh->ot_sa_list)) {
128                 obj = list_entry(oh->ot_sa_list.next,
129                                  struct osd_object, oo_sa_linkage);
130                 write_lock(&obj->oo_attr_lock);
131                 list_del_init(&obj->oo_sa_linkage);
132                 write_unlock(&obj->oo_attr_lock);
133                 if (obj->oo_late_xattr) {
134                         /*
135                          * take oo_guard to protect oo_sa_xattr buffer
136                          * from concurrent update by osd_xattr_set()
137                          */
138                         LASSERT(oh->ot_assigned != 0);
139                         down_write(&obj->oo_guard);
140                         if (obj->oo_late_attr_set)
141                                 __osd_sa_attr_init(env, obj, oh);
142                         else if (obj->oo_late_xattr)
143                                 __osd_sa_xattr_update(env, obj, oh);
144                         up_write(&obj->oo_guard);
145                 }
146                 sa_spill_rele(obj->oo_sa_hdl);
147         }
148 }
149
150 /*
151  * Update the SA and add the object to the dirty list.
152  */
153 int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
154                          void *buf, uint32_t buflen, struct osd_thandle *oh)
155 {
156         int rc;
157
158         LASSERT(obj->oo_sa_hdl != NULL);
159         LASSERT(oh->ot_tx != NULL);
160
161         rc = -sa_update(obj->oo_sa_hdl, type, buf, buflen, oh->ot_tx);
162         osd_object_sa_dirty_add(obj, oh);
163
164         return rc;
165 }
166
167 /*
168  * Bulk update the SA and add the object to the dirty list.
169  */
170 static int
171 osd_object_sa_bulk_update(struct osd_object *obj, sa_bulk_attr_t *attrs,
172                           int count, struct osd_thandle *oh)
173 {
174         int rc;
175
176         LASSERT(obj->oo_sa_hdl != NULL);
177         LASSERT(oh->ot_tx != NULL);
178
179         rc = -sa_bulk_update(obj->oo_sa_hdl, attrs, count, oh->ot_tx);
180         osd_object_sa_dirty_add(obj, oh);
181
182         return rc;
183 }
184
185 /*
186  * Retrieve the attributes of a DMU object
187  */
188 int __osd_object_attr_get(const struct lu_env *env, struct osd_device *o,
189                           struct osd_object *obj, struct lu_attr *la)
190 {
191         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
192         sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
193         int              cnt = 0;
194         int              rc;
195         ENTRY;
196
197         LASSERT(obj->oo_dn != NULL);
198
199         la->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | LA_TYPE |
200                         LA_SIZE | LA_UID | LA_GID | LA_FLAGS | LA_NLINK;
201
202         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(o), NULL, osa->atime, 16);
203         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(o), NULL, osa->mtime, 16);
204         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(o), NULL, osa->ctime, 16);
205         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(o), NULL, &osa->mode, 8);
206         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(o), NULL, &osa->size, 8);
207         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(o), NULL, &osa->nlink, 8);
208         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(o), NULL, &osa->uid, 8);
209         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(o), NULL, &osa->gid, 8);
210         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(o), NULL, &osa->flags, 8);
211         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
212
213         rc = -sa_bulk_lookup(obj->oo_sa_hdl, bulk, cnt);
214         if (rc)
215                 GOTO(out_sa, rc);
216
217 #ifdef ZFS_PROJINHERIT
218         if (o->od_projectused_dn && osa->flags & ZFS_PROJID) {
219                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PROJID(o),
220                                 &osa->projid, 8);
221                 if (rc)
222                         GOTO(out_sa, rc);
223
224                 la->la_projid = osa->projid;
225                 la->la_valid |= LA_PROJID;
226                 obj->oo_with_projid = 1;
227         } else {
228                 la->la_projid = ZFS_DEFAULT_PROJID;
229                 la->la_valid &= ~LA_PROJID;
230         }
231 #else
232         la->la_projid = 0;
233         la->la_valid &= ~LA_PROJID;
234 #endif
235
236         la->la_atime = osa->atime[0];
237         la->la_mtime = osa->mtime[0];
238         la->la_ctime = osa->ctime[0];
239         la->la_mode = osa->mode;
240         la->la_uid = osa->uid;
241         la->la_gid = osa->gid;
242         la->la_nlink = osa->nlink;
243         la->la_flags = attrs_zfs2fs(osa->flags);
244         la->la_size = osa->size;
245
246         /* Try to get extra flag from LMA. Right now, only LMAI_ORPHAN
247          * flags is stored in LMA, and it is only for orphan directory */
248         if (S_ISDIR(la->la_mode) && dt_object_exists(&obj->oo_dt)) {
249                 struct osd_thread_info *info = osd_oti_get(env);
250                 struct lustre_mdt_attrs *lma;
251                 struct lu_buf buf;
252
253                 lma = (struct lustre_mdt_attrs *)info->oti_buf;
254                 buf.lb_buf = lma;
255                 buf.lb_len = sizeof(info->oti_buf);
256                 rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
257                 if (rc > 0) {
258                         rc = 0;
259                         lma->lma_incompat = le32_to_cpu(lma->lma_incompat);
260                         obj->oo_lma_flags =
261                                 lma_to_lustre_flags(lma->lma_incompat);
262
263                 } else if (rc == -ENODATA) {
264                         rc = 0;
265                 }
266         }
267
268         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
269                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
270                 if (rc)
271                         GOTO(out_sa, rc);
272                 la->la_rdev = osa->rdev;
273                 la->la_valid |= LA_RDEV;
274         }
275 out_sa:
276
277         RETURN(rc);
278 }
279
280 int __osd_obj2dnode(objset_t *os, uint64_t oid, dnode_t **dnp)
281 {
282         dmu_buf_t *db;
283         dmu_buf_impl_t *dbi;
284         int rc;
285
286         rc = -dmu_bonus_hold(os, oid, osd_obj_tag, &db);
287         if (rc)
288                 return rc;
289
290         dbi = (dmu_buf_impl_t *)db;
291         DB_DNODE_ENTER(dbi);
292         *dnp = DB_DNODE(dbi);
293         LASSERT(*dnp != NULL);
294
295         return 0;
296 }
297
298 /*
299  * Concurrency: no concurrent access is possible that early in object
300  * life-cycle.
301  */
302 struct lu_object *osd_object_alloc(const struct lu_env *env,
303                                    const struct lu_object_header *hdr,
304                                    struct lu_device *d)
305 {
306         struct osd_object *mo;
307
308         OBD_SLAB_ALLOC_PTR_GFP(mo, osd_object_kmem, GFP_NOFS);
309         if (mo != NULL) {
310                 struct lu_object *l;
311
312                 l = &mo->oo_dt.do_lu;
313                 dt_object_init(&mo->oo_dt, NULL, d);
314                 mo->oo_dt.do_ops = &osd_obj_ops;
315                 l->lo_ops = &osd_lu_obj_ops;
316                 INIT_LIST_HEAD(&mo->oo_sa_linkage);
317                 INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
318                 init_rwsem(&mo->oo_sem);
319                 init_rwsem(&mo->oo_guard);
320                 rwlock_init(&mo->oo_attr_lock);
321                 mo->oo_destroy = OSD_DESTROY_NONE;
322                 return l;
323         } else {
324                 return NULL;
325         }
326 }
327
328 /*
329  * Concurrency: shouldn't matter.
330  */
331 int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
332 {
333         struct osd_device       *osd = osd_obj2dev(obj);
334         const struct lu_fid     *fid = lu_object_fid(&obj->oo_dt.do_lu);
335         int                      rc = 0;
336         ENTRY;
337
338         if (obj->oo_dn == NULL)
339                 RETURN(0);
340
341         /* object exist */
342
343         rc = osd_object_sa_init(obj, osd);
344         if (rc)
345                 RETURN(rc);
346
347         /* cache attrs in object */
348         rc = __osd_object_attr_get(env, osd, obj, &obj->oo_attr);
349         if (rc)
350                 RETURN(rc);
351
352         if (likely(!fid_is_acct(fid)))
353                 /* no body operations for accounting objects */
354                 obj->oo_dt.do_body_ops = &osd_body_ops;
355
356         /*
357          * initialize object before marking it existing
358          */
359         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
360
361         smp_mb();
362         obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
363
364         RETURN(0);
365 }
366
367 static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
368 {
369         struct osd_thread_info  *info = osd_oti_get(env);
370         struct lu_buf           buf;
371         int                     rc;
372         struct lustre_mdt_attrs *lma;
373         ENTRY;
374
375         CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
376         lma = (struct lustre_mdt_attrs *)info->oti_buf;
377         buf.lb_buf = lma;
378         buf.lb_len = sizeof(info->oti_buf);
379
380         rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
381         if (rc > 0) {
382                 rc = 0;
383                 lustre_lma_swab(lma);
384                 if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
385                              CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
386                         CWARN("%s: unsupported incompat LMA feature(s) %#x for "
387                               "fid = "DFID"\n", osd_obj2dev(obj)->od_svname,
388                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
389                               PFID(lu_object_fid(&obj->oo_dt.do_lu)));
390                         rc = -EOPNOTSUPP;
391                 }
392         } else if (rc == -ENODATA) {
393                 /* haven't initialize LMA xattr */
394                 rc = 0;
395         }
396
397         RETURN(rc);
398 }
399
400 /**
401  * Helper function to retrieve DMU object id from fid for accounting object
402  */
403 static dnode_t *osd_quota_fid2dmu(const struct osd_device *osd,
404                                   const struct lu_fid *fid)
405 {
406         dnode_t *dn = NULL;
407
408         LASSERT(fid_is_acct(fid));
409
410         switch (fid_oid(fid)) {
411         case ACCT_USER_OID:
412                 dn = osd->od_userused_dn;
413                 break;
414         case ACCT_GROUP_OID:
415                 dn = osd->od_groupused_dn;
416                 break;
417 #ifdef ZFS_PROJINHERIT
418         case ACCT_PROJECT_OID:
419                 dn = osd->od_projectused_dn;
420                 break;
421 #endif
422         default:
423                 break;
424         }
425
426         return dn;
427 }
428
429 /*
430  * Concurrency: no concurrent access is possible that early in object
431  * life-cycle.
432  */
433 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
434                            const struct lu_object_conf *conf)
435 {
436         struct osd_object *obj = osd_obj(l);
437         struct osd_device *osd = osd_obj2dev(obj);
438         const struct lu_fid *fid = lu_object_fid(l);
439         uint64_t oid;
440         int rc = 0;
441         ENTRY;
442
443         LASSERT(osd_invariant(obj));
444
445         if (fid_is_otable_it(&l->lo_header->loh_fid)) {
446                 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
447                 l->lo_header->loh_attr |= LOHA_EXISTS;
448                 RETURN(0);
449         }
450
451         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
452                 GOTO(out, rc = 0);
453
454         if (unlikely(fid_is_acct(fid))) {
455                 obj->oo_dn = osd_quota_fid2dmu(osd, fid);
456                 if (obj->oo_dn) {
457                         obj->oo_dt.do_index_ops = &osd_acct_index_ops;
458                         l->lo_header->loh_attr |= LOHA_EXISTS;
459                 }
460
461                 GOTO(out, rc = 0);
462         }
463
464         rc = osd_fid_lookup(env, osd, fid, &oid);
465         if (rc == 0) {
466                 LASSERT(obj->oo_dn == NULL);
467                 rc = __osd_obj2dnode(osd->od_os, oid, &obj->oo_dn);
468                 /* EEXIST will be returned if object is being deleted in ZFS */
469                 if (rc == -EEXIST) {
470                         rc = 0;
471                         GOTO(out, rc);
472                 }
473                 if (rc != 0) {
474                         CERROR("%s: lookup "DFID"/%#llx failed: rc = %d\n",
475                                osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
476                         GOTO(out, rc);
477                 }
478                 LASSERT(obj->oo_dn);
479                 rc = osd_object_init0(env, obj);
480                 if (rc != 0)
481                         GOTO(out, rc);
482
483                 rc = osd_check_lma(env, obj);
484                 if (rc != 0)
485                         GOTO(out, rc);
486         } else if (rc == -ENOENT) {
487                 rc = 0;
488         }
489         LASSERT(osd_invariant(obj));
490 out:
491         RETURN(rc);
492 }
493
494 /*
495  * Concurrency: no concurrent access is possible that late in object
496  * life-cycle.
497  */
498 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
499 {
500         struct osd_object *obj = osd_obj(l);
501
502         LASSERT(osd_invariant(obj));
503
504         dt_object_fini(&obj->oo_dt);
505         OBD_SLAB_FREE_PTR(obj, osd_object_kmem);
506 }
507
508 static int
509 osd_object_unlinked_add(struct osd_object *obj, struct osd_thandle *oh)
510 {
511         int rc = -EBUSY;
512
513         LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
514
515         /* the object is supposed to be exclusively locked by
516          * the caller (osd_destroy()), while the transaction
517          * (oh) is per-thread and not shared */
518         if (likely(list_empty(&obj->oo_unlinked_linkage))) {
519                 list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
520                 rc = 0;
521         }
522
523         return rc;
524 }
525
526 /* Default to max data size covered by a level-1 indirect block */
527 static unsigned long osd_sync_destroy_max_size =
528         1UL << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT + SPA_MAXBLOCKSHIFT);
529 module_param(osd_sync_destroy_max_size, ulong, 0444);
530 MODULE_PARM_DESC(osd_sync_destroy_max_size, "Maximum object size to use synchronous destroy.");
531
532 static inline void
533 osd_object_set_destroy_type(struct osd_object *obj)
534 {
535         /*
536          * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
537          * only once and use it consistently thereafter.
538          */
539         down_write(&obj->oo_guard);
540         if (obj->oo_destroy == OSD_DESTROY_NONE) {
541                 if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
542                         obj->oo_destroy = OSD_DESTROY_SYNC;
543                 else /* Larger objects are destroyed asynchronously */
544                         obj->oo_destroy = OSD_DESTROY_ASYNC;
545         }
546         up_write(&obj->oo_guard);
547 }
548
549 static int osd_declare_destroy(const struct lu_env *env, struct dt_object *dt,
550                                struct thandle *th)
551 {
552         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
553         struct osd_object       *obj = osd_dt_obj(dt);
554         struct osd_device       *osd = osd_obj2dev(obj);
555         struct osd_thandle      *oh;
556         dnode_t *dn;
557         int                      rc;
558         uint64_t                 zapid;
559         ENTRY;
560
561         LASSERT(th != NULL);
562         LASSERT(dt_object_exists(dt));
563
564         oh = container_of0(th, struct osd_thandle, ot_super);
565         LASSERT(oh->ot_tx != NULL);
566
567         /* declare that we'll remove object from fid-dnode mapping */
568         zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0, &dn);
569         osd_tx_hold_zap(oh->ot_tx, zapid, dn, FALSE, NULL);
570
571         osd_declare_xattrs_destroy(env, obj, oh);
572
573         /* one less inode */
574         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
575                                obj->oo_attr.la_gid, obj->oo_attr.la_projid,
576                                -1, oh, NULL, OSD_QID_INODE);
577         if (rc)
578                 RETURN(rc);
579
580         /* data to be truncated */
581         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
582                                obj->oo_attr.la_gid, obj->oo_attr.la_projid,
583                                0, oh, NULL, OSD_QID_BLK);
584         if (rc)
585                 RETURN(rc);
586
587         osd_object_set_destroy_type(obj);
588         if (obj->oo_destroy == OSD_DESTROY_SYNC)
589                 dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object,
590                                  0, DMU_OBJECT_END);
591         else
592                 osd_tx_hold_zap(oh->ot_tx, osd->od_unlinked->dn_object,
593                                 osd->od_unlinked, TRUE, NULL);
594
595         /* will help to find FID->ino when this object is being
596          * added to PENDING/ */
597         osd_idc_find_and_init(env, osd, obj);
598
599         RETURN(0);
600 }
601
602 static int osd_destroy(const struct lu_env *env, struct dt_object *dt,
603                        struct thandle *th)
604 {
605         struct osd_thread_info  *info = osd_oti_get(env);
606         char                    *buf = info->oti_str;
607         struct osd_object       *obj = osd_dt_obj(dt);
608         struct osd_device       *osd = osd_obj2dev(obj);
609         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
610         struct osd_thandle      *oh;
611         int                      rc;
612         uint64_t                 oid, zapid;
613         dnode_t *zdn;
614         ENTRY;
615
616         down_write(&obj->oo_guard);
617
618         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
619                 GOTO(out, rc = -ENOENT);
620
621         LASSERT(obj->oo_dn != NULL);
622
623         oh = container_of0(th, struct osd_thandle, ot_super);
624         LASSERT(oh != NULL);
625         LASSERT(oh->ot_tx != NULL);
626
627         /* remove obj ref from index dir (it depends) */
628         zapid = osd_get_name_n_idx(env, osd, fid, buf,
629                                    sizeof(info->oti_str), &zdn);
630         rc = osd_zap_remove(osd, zapid, zdn, buf, oh->ot_tx);
631         if (rc) {
632                 CERROR("%s: zap_remove(%s) failed: rc = %d\n",
633                        osd->od_svname, buf, rc);
634                 GOTO(out, rc);
635         }
636
637         rc = osd_xattrs_destroy(env, obj, oh);
638         if (rc) {
639                 CERROR("%s: cannot destroy xattrs for %s: rc = %d\n",
640                        osd->od_svname, buf, rc);
641                 GOTO(out, rc);
642         }
643
644         oid = obj->oo_dn->dn_object;
645         if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
646                 /* this may happen if the destroy wasn't declared
647                  * e.g. when the object is created and then destroyed
648                  * in the same transaction - we don't need additional
649                  * space for destroy specifically */
650                 LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size);
651                 rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
652                 if (rc)
653                         CERROR("%s: failed to free %s %llu: rc = %d\n",
654                                osd->od_svname, buf, oid, rc);
655         } else if (obj->oo_destroy == OSD_DESTROY_SYNC) {
656                 rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
657                 if (rc)
658                         CERROR("%s: failed to free %s %llu: rc = %d\n",
659                                osd->od_svname, buf, oid, rc);
660         } else { /* asynchronous destroy */
661                 char *key = info->oti_key;
662
663                 rc = osd_object_unlinked_add(obj, oh);
664                 if (rc)
665                         GOTO(out, rc);
666
667                 snprintf(key, sizeof(info->oti_key), "%llx", oid);
668                 rc = osd_zap_add(osd, osd->od_unlinked->dn_object,
669                                  osd->od_unlinked, key, 8, 1, &oid, oh->ot_tx);
670                 if (rc)
671                         CERROR("%s: zap_add_int() failed %s %llu: rc = %d\n",
672                                osd->od_svname, buf, oid, rc);
673         }
674
675 out:
676         /* not needed in the cache anymore */
677         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
678         if (rc == 0)
679                 obj->oo_destroyed = 1;
680         up_write(&obj->oo_guard);
681         RETURN (0);
682 }
683
684 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
685 {
686         struct osd_object *obj = osd_obj(l);
687         const struct lu_fid *fid = lu_object_fid(l);
688
689         if (obj->oo_dn) {
690                 if (likely(!fid_is_acct(fid))) {
691                         osd_object_sa_fini(obj);
692                         if (obj->oo_sa_xattr) {
693                                 nvlist_free(obj->oo_sa_xattr);
694                                 obj->oo_sa_xattr = NULL;
695                         }
696                         osd_dnode_rele(obj->oo_dn);
697                         list_del(&obj->oo_sa_linkage);
698                 }
699                 obj->oo_dn = NULL;
700         }
701 }
702
703 /*
704  * Concurrency: ->loo_object_release() is called under site spin-lock.
705  */
706 static void osd_object_release(const struct lu_env *env,
707                                struct lu_object *l)
708 {
709 }
710
711 /*
712  * Concurrency: shouldn't matter.
713  */
714 static int osd_object_print(const struct lu_env *env, void *cookie,
715                             lu_printer_t p, const struct lu_object *l)
716 {
717         struct osd_object *o = osd_obj(l);
718
719         return (*p)(env, cookie, LUSTRE_OSD_ZFS_NAME"-object@%p", o);
720 }
721
722 static void osd_read_lock(const struct lu_env *env, struct dt_object *dt,
723                           unsigned role)
724 {
725         struct osd_object *obj = osd_dt_obj(dt);
726
727         LASSERT(osd_invariant(obj));
728
729         down_read_nested(&obj->oo_sem, role);
730 }
731
732 static void osd_write_lock(const struct lu_env *env, struct dt_object *dt,
733                            unsigned role)
734 {
735         struct osd_object *obj = osd_dt_obj(dt);
736
737         LASSERT(osd_invariant(obj));
738
739         down_write_nested(&obj->oo_sem, role);
740 }
741
742 static void osd_read_unlock(const struct lu_env *env, struct dt_object *dt)
743 {
744         struct osd_object *obj = osd_dt_obj(dt);
745
746         LASSERT(osd_invariant(obj));
747         up_read(&obj->oo_sem);
748 }
749
750 static void osd_write_unlock(const struct lu_env *env, struct dt_object *dt)
751 {
752         struct osd_object *obj = osd_dt_obj(dt);
753
754         LASSERT(osd_invariant(obj));
755         up_write(&obj->oo_sem);
756 }
757
758 static int osd_write_locked(const struct lu_env *env, struct dt_object *dt)
759 {
760         struct osd_object *obj = osd_dt_obj(dt);
761         int rc = 1;
762
763         LASSERT(osd_invariant(obj));
764
765         if (down_write_trylock(&obj->oo_sem)) {
766                 rc = 0;
767                 up_write(&obj->oo_sem);
768         }
769         return rc;
770 }
771
772 static int osd_attr_get(const struct lu_env *env,
773                         struct dt_object *dt,
774                         struct lu_attr *attr)
775 {
776         struct osd_object       *obj = osd_dt_obj(dt);
777         uint64_t                 blocks;
778         uint32_t                 blksize;
779         int                      rc = 0;
780
781         down_read(&obj->oo_guard);
782
783         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
784                 GOTO(out, rc = -ENOENT);
785
786         if (unlikely(fid_is_acct(lu_object_fid(&dt->do_lu))))
787                 GOTO(out, rc = 0);
788
789         LASSERT(osd_invariant(obj));
790         LASSERT(obj->oo_dn);
791
792         read_lock(&obj->oo_attr_lock);
793         *attr = obj->oo_attr;
794         if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
795                 attr->la_flags |= LUSTRE_ORPHAN_FL;
796         read_unlock(&obj->oo_attr_lock);
797
798         /* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
799          * from within sa_object_size() can block on a mutex, so
800          * we can't call sa_object_size() holding rwlock */
801         sa_object_size(obj->oo_sa_hdl, &blksize, &blocks);
802         /* we do not control size of indices, so always calculate
803          * it from number of blocks reported by DMU */
804         if (S_ISDIR(attr->la_mode))
805                 attr->la_size = 512 * blocks;
806         /* Block size may be not set; suggest maximal I/O transfers. */
807         if (blksize == 0)
808                 blksize = osd_spa_maxblocksize(
809                         dmu_objset_spa(osd_obj2dev(obj)->od_os));
810
811         attr->la_blksize = blksize;
812         attr->la_blocks = blocks;
813         attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
814
815 out:
816         up_read(&obj->oo_guard);
817         return rc;
818 }
819
820 /* Simple wrapper on top of qsd API which implement quota transfer for osd
821  * setattr needs. As a reminder, only the root user can change ownership of
822  * a file, that's why EDQUOT & EINPROGRESS errors are discarded */
823 static inline int qsd_transfer(const struct lu_env *env,
824                                struct qsd_instance *qsd,
825                                struct lquota_trans *trans, int qtype,
826                                __u64 orig_id, __u64 new_id, __u64 bspace,
827                                struct lquota_id_info *qi)
828 {
829         int     rc;
830
831         if (unlikely(qsd == NULL))
832                 return 0;
833
834         LASSERT(qtype >= 0 && qtype < LL_MAXQUOTAS);
835         qi->lqi_type = qtype;
836
837         /* inode accounting */
838         qi->lqi_is_blk = false;
839
840         /* one more inode for the new owner ... */
841         qi->lqi_id.qid_uid = new_id;
842         qi->lqi_space      = 1;
843         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
844         if (rc == -EDQUOT || rc == -EINPROGRESS)
845                 rc = 0;
846         if (rc)
847                 return rc;
848
849         /* and one less inode for the current id */
850         qi->lqi_id.qid_uid = orig_id;;
851         qi->lqi_space      = -1;
852         /* can't get EDQUOT when reducing usage */
853         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
854         if (rc == -EINPROGRESS)
855                 rc = 0;
856         if (rc)
857                 return rc;
858
859         /* block accounting */
860         qi->lqi_is_blk = true;
861
862         /* more blocks for the new owner ... */
863         qi->lqi_id.qid_uid = new_id;
864         qi->lqi_space      = bspace;
865         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
866         if (rc == -EDQUOT || rc == -EINPROGRESS)
867                 rc = 0;
868         if (rc)
869                 return rc;
870
871         /* and finally less blocks for the current owner */
872         qi->lqi_id.qid_uid = orig_id;
873         qi->lqi_space      = -bspace;
874         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
875         /* can't get EDQUOT when reducing usage */
876         if (rc == -EINPROGRESS)
877                 rc = 0;
878         return rc;
879 }
880
881 static int osd_declare_attr_set(const struct lu_env *env,
882                                 struct dt_object *dt,
883                                 const struct lu_attr *attr,
884                                 struct thandle *handle)
885 {
886         struct osd_thread_info  *info = osd_oti_get(env);
887         struct osd_object       *obj = osd_dt_obj(dt);
888         struct osd_device       *osd = osd_obj2dev(obj);
889         dmu_tx_hold_t           *txh;
890         struct osd_thandle      *oh;
891         uint64_t                 bspace;
892         uint32_t                 blksize;
893         int                      rc = 0;
894         bool                     found;
895         ENTRY;
896
897
898         LASSERT(handle != NULL);
899         LASSERT(osd_invariant(obj));
900
901         oh = container_of0(handle, struct osd_thandle, ot_super);
902
903         down_read(&obj->oo_guard);
904         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
905                 GOTO(out, rc = 0);
906
907         LASSERT(obj->oo_sa_hdl != NULL);
908         LASSERT(oh->ot_tx != NULL);
909         /* regular attributes are part of the bonus buffer */
910         /* let's check whether this object is already part of
911          * transaction.. */
912         found = false;
913         for (txh = list_head(&oh->ot_tx->tx_holds); txh;
914              txh = list_next(&oh->ot_tx->tx_holds, txh)) {
915                 if (txh->txh_dnode == NULL)
916                         continue;
917                 if (txh->txh_dnode->dn_object != obj->oo_dn->dn_object)
918                         continue;
919                 /* this object is part of the transaction already
920                  * we don't need to declare bonus again */
921                 found = true;
922                 break;
923         }
924         if (!found)
925                 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_dn->dn_object);
926         if (oh->ot_tx->tx_err != 0)
927                 GOTO(out, rc = -oh->ot_tx->tx_err);
928
929         if (attr && attr->la_valid & LA_FLAGS) {
930                 /* LMA is usually a part of bonus, no need to declare
931                  * anything else */
932         }
933
934         if (attr && (attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) {
935                 sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
936                 bspace = toqb(bspace * blksize);
937         }
938
939         if (attr && attr->la_valid & LA_UID) {
940                 /* quota enforcement for user */
941                 if (attr->la_uid != obj->oo_attr.la_uid) {
942                         rc = qsd_transfer(env, osd->od_quota_slave,
943                                           &oh->ot_quota_trans, USRQUOTA,
944                                           obj->oo_attr.la_uid, attr->la_uid,
945                                           bspace, &info->oti_qi);
946                         if (rc)
947                                 GOTO(out, rc);
948                 }
949         }
950         if (attr && attr->la_valid & LA_GID) {
951                 /* quota enforcement for group */
952                 if (attr->la_gid != obj->oo_attr.la_gid) {
953                         rc = qsd_transfer(env, osd->od_quota_slave,
954                                           &oh->ot_quota_trans, GRPQUOTA,
955                                           obj->oo_attr.la_gid, attr->la_gid,
956                                           bspace, &info->oti_qi);
957                         if (rc)
958                                 GOTO(out, rc);
959                 }
960         }
961 #ifdef ZFS_PROJINHERIT
962         if (attr && attr->la_valid & LA_PROJID) {
963                 if (!osd->od_projectused_dn)
964                         GOTO(out, rc = -EOPNOTSUPP);
965
966                 /* Usually, if project quota is upgradable for the device,
967                  * then the upgrade will be done before or when mount the
968                  * device. So when we come here, this project should have
969                  * project ID attribute already (that is zero by default).
970                  * Otherwise, there was something wrong during the former
971                  * upgrade, let's return failure to report that.
972                  *
973                  * Please note that, different from other attributes, you
974                  * can NOT simply set the project ID attribute under such
975                  * case, because adding (NOT change) project ID attribute
976                  * needs to change the object's attribute layout to match
977                  * zfs backend quota accounting requirement. */
978                 if (unlikely(!obj->oo_with_projid))
979                         GOTO(out, rc = -ENXIO);
980
981                 /* quota enforcement for project */
982                 if (attr->la_projid != obj->oo_attr.la_projid) {
983                         rc = qsd_transfer(env, osd->od_quota_slave,
984                                           &oh->ot_quota_trans, PRJQUOTA,
985                                           obj->oo_attr.la_projid,
986                                           attr->la_projid, bspace,
987                                           &info->oti_qi);
988                         if (rc)
989                                 GOTO(out, rc);
990                 }
991         }
992 #endif
993 out:
994         up_read(&obj->oo_guard);
995         RETURN(rc);
996 }
997
998 /*
999  * Set the attributes of an object
1000  *
1001  * The transaction passed to this routine must have
1002  * dmu_tx_hold_bonus(tx, oid) called and then assigned
1003  * to a transaction group.
1004  */
1005 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
1006                         const struct lu_attr *la, struct thandle *handle)
1007 {
1008         struct osd_thread_info  *info = osd_oti_get(env);
1009         sa_bulk_attr_t          *bulk = osd_oti_get(env)->oti_attr_bulk;
1010         struct osd_object       *obj = osd_dt_obj(dt);
1011         struct osd_device       *osd = osd_obj2dev(obj);
1012         struct osd_thandle      *oh;
1013         struct osa_attr         *osa = &info->oti_osa;
1014         __u64                    valid = la->la_valid;
1015         int                      cnt;
1016         int                      rc = 0;
1017
1018         ENTRY;
1019
1020         down_read(&obj->oo_guard);
1021         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1022                 GOTO(out, rc = -ENOENT);
1023
1024         LASSERT(handle != NULL);
1025         LASSERT(osd_invariant(obj));
1026         LASSERT(obj->oo_sa_hdl);
1027
1028         oh = container_of0(handle, struct osd_thandle, ot_super);
1029         /* Assert that the transaction has been assigned to a
1030            transaction group. */
1031         LASSERT(oh->ot_tx->tx_txg != 0);
1032
1033         /* Only allow set size for regular file */
1034         if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
1035                 valid &= ~(LA_SIZE | LA_BLOCKS);
1036
1037         if (valid & LA_CTIME && la->la_ctime == obj->oo_attr.la_ctime)
1038                 valid &= ~LA_CTIME;
1039
1040         if (valid & LA_MTIME && la->la_mtime == obj->oo_attr.la_mtime)
1041                 valid &= ~LA_MTIME;
1042
1043         if (valid & LA_ATIME && la->la_atime == obj->oo_attr.la_atime)
1044                 valid &= ~LA_ATIME;
1045
1046         if (valid == 0)
1047                 GOTO(out, rc = 0);
1048
1049         if (valid & LA_FLAGS) {
1050                 struct lustre_mdt_attrs *lma;
1051                 struct lu_buf buf;
1052
1053                 if (la->la_flags & LUSTRE_LMA_FL_MASKS) {
1054                         CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
1055                         lma = (struct lustre_mdt_attrs *)&info->oti_buf;
1056                         buf.lb_buf = lma;
1057                         buf.lb_len = sizeof(info->oti_buf);
1058                         rc = osd_xattr_get(env, &obj->oo_dt, &buf,
1059                                            XATTR_NAME_LMA);
1060                         if (rc > 0) {
1061                                 lma->lma_incompat =
1062                                         le32_to_cpu(lma->lma_incompat);
1063                                 lma->lma_incompat |=
1064                                         lustre_to_lma_flags(la->la_flags);
1065                                 lma->lma_incompat =
1066                                         cpu_to_le32(lma->lma_incompat);
1067                                 buf.lb_buf = lma;
1068                                 buf.lb_len = sizeof(*lma);
1069                                 rc = osd_xattr_set_internal(env, obj, &buf,
1070                                                             XATTR_NAME_LMA,
1071                                                             LU_XATTR_REPLACE,
1072                                                             oh);
1073                         }
1074                         if (rc < 0) {
1075                                 CWARN("%s: failed to set LMA flags: rc = %d\n",
1076                                        osd->od_svname, rc);
1077                                 GOTO(out, rc);
1078                         }
1079                 }
1080         }
1081
1082         write_lock(&obj->oo_attr_lock);
1083         cnt = 0;
1084
1085         if (valid & LA_PROJID) {
1086 #ifdef ZFS_PROJINHERIT
1087                 /* osd_declare_attr_set() must be called firstly.
1088                  * If osd::od_projectused_dn is not set, then we
1089                  * can not arrive at here. */
1090                 LASSERT(osd->od_projectused_dn);
1091                 LASSERT(obj->oo_with_projid);
1092
1093                 osa->projid = obj->oo_attr.la_projid = la->la_projid;
1094                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PROJID(osd), NULL,
1095                                  &osa->projid, 8);
1096 #else
1097                 valid &= ~LA_PROJID;
1098 #endif
1099         }
1100
1101         if (valid & LA_ATIME) {
1102                 osa->atime[0] = obj->oo_attr.la_atime = la->la_atime;
1103                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL,
1104                                  osa->atime, 16);
1105         }
1106         if (valid & LA_MTIME) {
1107                 osa->mtime[0] = obj->oo_attr.la_mtime = la->la_mtime;
1108                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL,
1109                                  osa->mtime, 16);
1110         }
1111         if (valid & LA_CTIME) {
1112                 osa->ctime[0] = obj->oo_attr.la_ctime = la->la_ctime;
1113                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL,
1114                                  osa->ctime, 16);
1115         }
1116         if (valid & LA_MODE) {
1117                 /* mode is stored along with type, so read it first */
1118                 obj->oo_attr.la_mode = (obj->oo_attr.la_mode & S_IFMT) |
1119                         (la->la_mode & ~S_IFMT);
1120                 osa->mode = obj->oo_attr.la_mode;
1121                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL,
1122                                  &osa->mode, 8);
1123         }
1124         if (valid & LA_SIZE) {
1125                 osa->size = obj->oo_attr.la_size = la->la_size;
1126                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL,
1127                                  &osa->size, 8);
1128         }
1129         if (valid & LA_NLINK) {
1130                 osa->nlink = obj->oo_attr.la_nlink = la->la_nlink;
1131                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL,
1132                                  &osa->nlink, 8);
1133         }
1134         if (valid & LA_RDEV) {
1135                 osa->rdev = obj->oo_attr.la_rdev = la->la_rdev;
1136                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL,
1137                                  &osa->rdev, 8);
1138         }
1139         if (valid & LA_FLAGS) {
1140                 osa->flags = attrs_fs2zfs(la->la_flags);
1141                 /* many flags are not supported by zfs, so ensure a good cached
1142                  * copy */
1143                 obj->oo_attr.la_flags = attrs_zfs2fs(osa->flags);
1144 #ifdef ZFS_PROJINHERIT
1145                 if (obj->oo_with_projid)
1146                         osa->flags |= ZFS_PROJID;
1147 #endif
1148                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL,
1149                                  &osa->flags, 8);
1150         }
1151         if (valid & LA_UID) {
1152                 osa->uid = obj->oo_attr.la_uid = la->la_uid;
1153                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL,
1154                                  &osa->uid, 8);
1155         }
1156         if (valid & LA_GID) {
1157                 osa->gid = obj->oo_attr.la_gid = la->la_gid;
1158                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL,
1159                                  &osa->gid, 8);
1160         }
1161         obj->oo_attr.la_valid |= valid;
1162         write_unlock(&obj->oo_attr_lock);
1163
1164         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
1165         rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
1166
1167 out:
1168         up_read(&obj->oo_guard);
1169         RETURN(rc);
1170 }
1171
1172 /*
1173  * Object creation.
1174  *
1175  * XXX temporary solution.
1176  */
1177
1178 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1179                         struct dt_object *parent, struct dt_object *child,
1180                         umode_t child_mode)
1181 {
1182         LASSERT(ah);
1183
1184         ah->dah_parent = parent;
1185         ah->dah_mode = child_mode;
1186
1187         if (parent != NULL && !dt_object_remote(parent)) {
1188                 /* will help to find FID->ino at dt_insert("..") */
1189                 struct osd_object *pobj = osd_dt_obj(parent);
1190
1191                 osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
1192         }
1193 }
1194
1195 static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
1196                               struct lu_attr *attr,
1197                               struct dt_allocation_hint *hint,
1198                               struct dt_object_format *dof,
1199                               struct thandle *handle)
1200 {
1201         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1202         struct osd_object       *obj = osd_dt_obj(dt);
1203         struct osd_device       *osd = osd_obj2dev(obj);
1204         struct osd_thandle      *oh;
1205         uint64_t                 zapid;
1206         dnode_t                 *dn;
1207         int                      rc, dnode_size;
1208         ENTRY;
1209
1210         LASSERT(dof);
1211
1212         switch (dof->dof_type) {
1213                 case DFT_REGULAR:
1214                 case DFT_SYM:
1215                 case DFT_NODE:
1216                         if (obj->oo_dt.do_body_ops == NULL)
1217                                 obj->oo_dt.do_body_ops = &osd_body_ops;
1218                         break;
1219                 default:
1220                         break;
1221         }
1222
1223         LASSERT(handle != NULL);
1224         oh = container_of0(handle, struct osd_thandle, ot_super);
1225         LASSERT(oh->ot_tx != NULL);
1226
1227         /* this is the minimum set of EAs on every Lustre object */
1228         obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
1229                                 sizeof(__u64) + /* VBR VERSION */
1230                                 sizeof(struct lustre_mdt_attrs); /* LMA */
1231         /* reserve 32 bytes for extra stuff like ACLs */
1232         dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
1233
1234         switch (dof->dof_type) {
1235                 case DFT_DIR:
1236                         dt->do_index_ops = &osd_dir_ops;
1237                 case DFT_INDEX:
1238                         /* for zap create */
1239                         dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL);
1240                         dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
1241                         break;
1242                 case DFT_REGULAR:
1243                 case DFT_SYM:
1244                 case DFT_NODE:
1245                         /* first, we'll create new object */
1246                         dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
1247                         break;
1248
1249                 default:
1250                         LBUG();
1251                         break;
1252         }
1253
1254         /* and we'll add it to some mapping */
1255         zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0, &dn);
1256         osd_tx_hold_zap(oh->ot_tx, zapid, dn, TRUE, NULL);
1257
1258         /* will help to find FID->ino mapping at dt_insert() */
1259         osd_idc_find_and_init(env, osd, obj);
1260
1261         rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid,
1262                                attr->la_projid, 1, oh, NULL, OSD_QID_INODE);
1263
1264         RETURN(rc);
1265 }
1266
1267 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
1268                     struct osd_object *obj, sa_handle_t *sa_hdl, dmu_tx_t *tx,
1269                     struct lu_attr *la, uint64_t parent,
1270                     nvlist_t *xattr)
1271 {
1272         sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
1273         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
1274         uint64_t         gen;
1275         uint64_t         crtime[2];
1276         timestruc_t      now;
1277         int              cnt;
1278         int              rc;
1279         char *dxattr = NULL;
1280         size_t sa_size;
1281
1282
1283         LASSERT(sa_hdl);
1284
1285         gen = dmu_tx_get_txg(tx);
1286         gethrestime(&now);
1287         ZFS_TIME_ENCODE(&now, crtime);
1288
1289         osa->atime[0] = la->la_atime;
1290         osa->ctime[0] = la->la_ctime;
1291         osa->mtime[0] = la->la_mtime;
1292         osa->mode = la->la_mode;
1293         osa->uid = la->la_uid;
1294         osa->gid = la->la_gid;
1295         osa->rdev = la->la_rdev;
1296         osa->nlink = la->la_nlink;
1297         if (la->la_valid & LA_FLAGS)
1298                 osa->flags = attrs_fs2zfs(la->la_flags);
1299         else
1300                 osa->flags = 0;
1301         osa->size  = la->la_size;
1302 #ifdef ZFS_PROJINHERIT
1303         if (osd->od_projectused_dn) {
1304                 if (la->la_valid & LA_PROJID)
1305                         osa->projid = la->la_projid;
1306                 else
1307                         osa->projid = ZFS_DEFAULT_PROJID;
1308                 osa->flags |= ZFS_PROJID;
1309                 if (obj)
1310                         obj->oo_with_projid = 1;
1311         } else {
1312                 osa->flags &= ~ZFS_PROJID;
1313         }
1314 #endif
1315
1316         /*
1317          * we need to create all SA below upon object create.
1318          *
1319          * XXX The attribute order matters since the accounting callback relies
1320          * on static offsets (i.e. SA_*_OFFSET, see zfs_space_delta_cb()) to
1321          * look up the UID/GID/PROJID attributes. Moreover, the callback does
1322          * not seem to support the spill block.
1323          * We define attributes in the same order as SA_*_OFFSET in order to
1324          * work around the problem. See ORI-610.
1325          */
1326         cnt = 0;
1327         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8);
1328         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8);
1329         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8);
1330         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8);
1331         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8);
1332         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL, &parent, 8);
1333         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8);
1334         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16);
1335         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16);
1336         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16);
1337         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, crtime, 16);
1338         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8);
1339 #ifdef ZFS_PROJINHERIT
1340         if (osd->od_projectused_dn)
1341                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PROJID(osd), NULL,
1342                                  &osa->projid, 8);
1343 #endif
1344         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
1345         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
1346
1347         if (xattr) {
1348                 rc = -nvlist_size(xattr, &sa_size, NV_ENCODE_XDR);
1349                 LASSERT(rc == 0);
1350
1351                 dxattr = osd_zio_buf_alloc(sa_size);
1352                 LASSERT(dxattr);
1353
1354                 rc = -nvlist_pack(xattr, &dxattr, &sa_size,
1355                                 NV_ENCODE_XDR, KM_SLEEP);
1356                 LASSERT(rc == 0);
1357
1358                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd),
1359                                 NULL, dxattr, sa_size);
1360         }
1361
1362         rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
1363         if (dxattr)
1364                 osd_zio_buf_free(dxattr, sa_size);
1365
1366         return rc;
1367 }
1368
1369 static int osd_find_new_dnode(const struct lu_env *env, dmu_tx_t *tx,
1370                               uint64_t oid, dnode_t **dnp)
1371 {
1372         dmu_tx_hold_t *txh;
1373         int rc = 0;
1374
1375         /* take dnode_t from tx to save on dnode#->dnode_t lookup */
1376         for (txh = list_tail(&tx->tx_holds); txh;
1377              txh = list_prev(&tx->tx_holds, txh)) {
1378                 dnode_t *dn = txh->txh_dnode;
1379                 dmu_buf_impl_t *db;
1380
1381                 if (dn == NULL)
1382                         continue;
1383                 if (dn->dn_object != oid)
1384                         continue;
1385                 db = dn->dn_bonus;
1386                 if (db == NULL) {
1387                         rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
1388                         if (dn->dn_bonus == NULL)
1389                                 dbuf_create_bonus(dn);
1390                         rw_exit(&dn->dn_struct_rwlock);
1391                 }
1392                 db = dn->dn_bonus;
1393                 LASSERT(db);
1394                 LASSERT(dn->dn_handle);
1395                 DB_DNODE_ENTER(db);
1396                 if (refcount_add(&db->db_holds, osd_obj_tag) == 1) {
1397                         refcount_add(&dn->dn_holds, tag);
1398                         atomic_inc_32(&dn->dn_dbufs_count);
1399                 }
1400                 *dnp = dn;
1401                 dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH);
1402                 break;
1403         }
1404
1405         if (unlikely(*dnp == NULL))
1406                 rc = __osd_obj2dnode(tx->tx_objset, oid, dnp);
1407
1408         return rc;
1409 }
1410
1411 #ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
1412 static int osd_find_dnsize(struct osd_object *obj)
1413 {
1414         struct osd_device *osd = osd_obj2dev(obj);
1415         int dnsize;
1416
1417         if (osd->od_dnsize == ZFS_DNSIZE_AUTO) {
1418                 dnsize = DNODE_MIN_SIZE;
1419                 do {
1420                         if (DN_BONUS_SIZE(dnsize) >= obj->oo_ea_in_bonus + 32)
1421                                 break;
1422                         dnsize <<= 1;
1423                 } while (dnsize < DNODE_MAX_SIZE);
1424                 if (dnsize > DNODE_MAX_SIZE)
1425                         dnsize = DNODE_MAX_SIZE;
1426         } else if (osd->od_dnsize == ZFS_DNSIZE_1K) {
1427                 dnsize = 1024;
1428         } else if (osd->od_dnsize == ZFS_DNSIZE_2K) {
1429                 dnsize = 2048;
1430         } else if (osd->od_dnsize == ZFS_DNSIZE_4K) {
1431                 dnsize = 4096;
1432         } else if (osd->od_dnsize == ZFS_DNSIZE_8K) {
1433                 dnsize = 8192;
1434         } else if (osd->od_dnsize == ZFS_DNSIZE_16K) {
1435                 dnsize = 16384;
1436         } else {
1437                 dnsize = DNODE_MIN_SIZE;
1438         }
1439         return dnsize;
1440 }
1441 #else
1442 static int inline osd_find_dnsize(struct osd_object *obj)
1443 {
1444         return DN_MAX_BONUSLEN;
1445 }
1446 #endif
1447
1448 /*
1449  * The transaction passed to this routine must have
1450  * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
1451  * to a transaction group.
1452  */
1453 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
1454                         dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
1455 {
1456         struct osd_device   *osd = osd_obj2dev(obj);
1457         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
1458         dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
1459         uint64_t oid;
1460
1461         /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
1462          * would get an additional ditto copy */
1463         if (unlikely(S_ISREG(la->la_mode) &&
1464                      fid_seq_is_local_file(fid_seq(fid))))
1465                 type = DMU_OTN_UINT8_METADATA;
1466
1467         /* Create a new DMU object using the default dnode size. */
1468         oid = osd_dmu_object_alloc(osd->od_os, type, 0,
1469                                    osd_find_dnsize(obj), tx);
1470
1471         LASSERT(la->la_valid & LA_MODE);
1472         la->la_size = 0;
1473         la->la_nlink = 1;
1474
1475         return osd_find_new_dnode(env, tx, oid, dnp);
1476 }
1477
1478 /*
1479  * The transaction passed to this routine must have
1480  * dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, ...) called and then assigned
1481  * to a transaction group.
1482  *
1483  * Using ZAP_FLAG_HASH64 will force the ZAP to always be a FAT ZAP.
1484  * This is fine for directories today, because storing the FID in the dirent
1485  * will also require a FAT ZAP.  If there is a new type of micro ZAP created
1486  * then we might need to re-evaluate the use of this flag and instead do
1487  * a conversion from the different internal ZAP hash formats being used. */
1488 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
1489                      dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la,
1490                      unsigned dnsize, zap_flags_t flags)
1491 {
1492         uint64_t oid;
1493
1494         /* Assert that the transaction has been assigned to a
1495            transaction group. */
1496         LASSERT(tx->tx_txg != 0);
1497         *dnp = NULL;
1498
1499         oid = osd_zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
1500                                    DMU_OT_DIRECTORY_CONTENTS,
1501                                    14, /* == ZFS fzap_default_blockshift */
1502                                    DN_MAX_INDBLKSHIFT, /* indirect blockshift */
1503                                    dnsize, tx);
1504
1505         la->la_size = 2;
1506         la->la_nlink = 1;
1507
1508         return osd_find_new_dnode(env, tx, oid, dnp);
1509 }
1510
1511 static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
1512                           struct lu_attr *la, struct osd_thandle *oh)
1513 {
1514         dnode_t *dn;
1515         int rc;
1516
1517         /* Index file should be created as regular file in order not to confuse
1518          * ZPL which could interpret them as directory.
1519          * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
1520          * binary keys */
1521         LASSERT(S_ISREG(la->la_mode));
1522         rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
1523                               osd_find_dnsize(obj), ZAP_FLAG_UINT64_KEY);
1524         if (rc)
1525                 return ERR_PTR(rc);
1526         return dn;
1527 }
1528
1529 static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
1530                           struct lu_attr *la, struct osd_thandle *oh)
1531 {
1532         dnode_t *dn;
1533         int rc;
1534
1535         LASSERT(S_ISDIR(la->la_mode));
1536         rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
1537                               osd_find_dnsize(obj), 0);
1538         if (rc)
1539                 return ERR_PTR(rc);
1540         return dn;
1541 }
1542
1543 static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
1544                           struct lu_attr *la, struct osd_thandle *oh)
1545 {
1546         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
1547         struct osd_device *osd = osd_obj2dev(obj);
1548         dnode_t *dn;
1549         int rc;
1550
1551         LASSERT(S_ISREG(la->la_mode));
1552         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1553         if (rc)
1554                 return ERR_PTR(rc);
1555
1556         if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) &&
1557             osd->od_is_ost) {
1558                 /* The minimum block size must be at least page size otherwise
1559                  * it will break the assumption in tgt_thread_big_cache where
1560                  * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
1561                  * RDMA due to subpage transfer size */
1562                 rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object,
1563                                                PAGE_SIZE, 0, oh->ot_tx);
1564                 if (unlikely(rc)) {
1565                         CERROR("%s: can't change blocksize: %d\n",
1566                                osd->od_svname, rc);
1567                         return ERR_PTR(rc);
1568                 }
1569         }
1570
1571         return dn;
1572 }
1573
1574 static dnode_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
1575                           struct lu_attr *la, struct osd_thandle *oh)
1576 {
1577         dnode_t *dn;
1578         int rc;
1579
1580         LASSERT(S_ISLNK(la->la_mode));
1581         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1582         if (rc)
1583                 return ERR_PTR(rc);
1584         return dn;
1585 }
1586
1587 static dnode_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
1588                           struct lu_attr *la, struct osd_thandle *oh)
1589 {
1590         dnode_t *dn;
1591         int rc;
1592
1593         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
1594                 la->la_valid |= LA_RDEV;
1595
1596         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1597         if (rc)
1598                 return ERR_PTR(rc);
1599         return dn;
1600 }
1601
1602 typedef dnode_t *(*osd_obj_type_f)(const struct lu_env *env,
1603                                    struct osd_object *obj,
1604                                    struct lu_attr *la,
1605                                    struct osd_thandle *oh);
1606
1607 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1608 {
1609         osd_obj_type_f result;
1610
1611         switch (type) {
1612         case DFT_DIR:
1613                 result = osd_mkdir;
1614                 break;
1615         case DFT_INDEX:
1616                 result = osd_mkidx;
1617                 break;
1618         case DFT_REGULAR:
1619                 result = osd_mkreg;
1620                 break;
1621         case DFT_SYM:
1622                 result = osd_mksym;
1623                 break;
1624         case DFT_NODE:
1625                 result = osd_mknod;
1626                 break;
1627         default:
1628                 LBUG();
1629                 break;
1630         }
1631         return result;
1632 }
1633
1634 /*
1635  * Concurrency: @dt is write locked.
1636  */
1637 static int osd_create(const struct lu_env *env, struct dt_object *dt,
1638                       struct lu_attr *attr, struct dt_allocation_hint *hint,
1639                       struct dt_object_format *dof, struct thandle *th)
1640 {
1641         struct osd_thread_info  *info = osd_oti_get(env);
1642         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
1643         struct zpl_direntry     *zde = &info->oti_zde.lzd_reg;
1644         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1645         struct osd_object       *obj = osd_dt_obj(dt);
1646         struct osd_device       *osd = osd_obj2dev(obj);
1647         char                    *buf = info->oti_str;
1648         struct osd_thandle      *oh;
1649         dnode_t *dn = NULL, *zdn = NULL;
1650         uint64_t                 zapid, parent = 0;
1651         int                      rc;
1652
1653         ENTRY;
1654
1655         LASSERT(!fid_is_acct(fid));
1656
1657         /* concurrent create declarations should not see
1658          * the object inconsistent (db, attr, etc).
1659          * in regular cases acquisition should be cheap */
1660         down_write(&obj->oo_guard);
1661
1662         if (unlikely(dt_object_exists(dt)))
1663                 GOTO(out, rc = -EEXIST);
1664
1665         LASSERT(osd_invariant(obj));
1666         LASSERT(dof != NULL);
1667
1668         LASSERT(th != NULL);
1669         oh = container_of0(th, struct osd_thandle, ot_super);
1670
1671         LASSERT(obj->oo_dn == NULL);
1672
1673         /* to follow ZFS on-disk format we need
1674          * to initialize parent dnode properly */
1675         if (hint != NULL && hint->dah_parent != NULL &&
1676             !dt_object_remote(hint->dah_parent))
1677                 parent = osd_dt_obj(hint->dah_parent)->oo_dn->dn_object;
1678
1679         /* we may fix some attributes, better do not change the source */
1680         obj->oo_attr = *attr;
1681         obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
1682
1683 #ifdef ZFS_PROJINHERIT
1684         if (osd->od_projectused_dn) {
1685                 if (!(obj->oo_attr.la_valid & LA_PROJID))
1686                         obj->oo_attr.la_projid = ZFS_DEFAULT_PROJID;
1687                 obj->oo_with_projid = 1;
1688         }
1689 #endif
1690
1691         dn = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
1692         if (IS_ERR(dn)) {
1693                 rc = PTR_ERR(dn);
1694                 dn = NULL;
1695                 GOTO(out, rc);
1696         }
1697
1698         zde->zde_pad = 0;
1699         zde->zde_dnode = dn->dn_object;
1700         zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
1701
1702         zapid = osd_get_name_n_idx(env, osd, fid, buf,
1703                                    sizeof(info->oti_str), &zdn);
1704         rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1, zde, oh->ot_tx);
1705         if (rc)
1706                 GOTO(out, rc);
1707         obj->oo_dn = dn;
1708         /* Now add in all of the "SA" attributes */
1709         rc = osd_sa_handle_get(obj);
1710         if (rc)
1711                 GOTO(out, rc);
1712
1713         rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
1714         if (rc)
1715                 GOTO(out, rc);
1716
1717         /* initialize LMA */
1718         lustre_lma_init(lma, fid, 0, 0);
1719         lustre_lma_swab(lma);
1720         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA,
1721                                     (uchar_t *)lma, sizeof(*lma));
1722         if (rc)
1723                 GOTO(out, rc);
1724
1725         /* configure new osd object */
1726         obj->oo_parent = parent != 0 ? parent : zapid;
1727         obj->oo_late_attr_set = 1;
1728         rc = __osd_sa_xattr_schedule_update(env, obj, oh);
1729         if (rc)
1730                 GOTO(out, rc);
1731
1732         /* XXX: oo_lma_flags */
1733         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
1734         if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu))))
1735                 /* no body operations for accounting objects */
1736                 obj->oo_dt.do_body_ops = &osd_body_ops;
1737
1738         osd_idc_find_and_init(env, osd, obj);
1739
1740 out:
1741         if (unlikely(rc && dn)) {
1742                 dmu_object_free(osd->od_os, dn->dn_object, oh->ot_tx);
1743                 osd_dnode_rele(dn);
1744                 obj->oo_dn = NULL;
1745         } else if (!rc) {
1746                 obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
1747         }
1748         up_write(&obj->oo_guard);
1749         RETURN(rc);
1750 }
1751
1752 static int osd_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
1753                                struct thandle *th)
1754 {
1755         return osd_declare_attr_set(env, dt, NULL, th);
1756 }
1757
1758 /*
1759  * Concurrency: @dt is write locked.
1760  */
1761 static int osd_ref_add(const struct lu_env *env, struct dt_object *dt,
1762                        struct thandle *handle)
1763 {
1764         struct osd_object       *obj = osd_dt_obj(dt);
1765         struct osd_thandle      *oh;
1766         struct osd_device       *osd = osd_obj2dev(obj);
1767         uint64_t                 nlink;
1768         int rc;
1769
1770         ENTRY;
1771
1772         down_read(&obj->oo_guard);
1773         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1774                 GOTO(out, rc = -ENOENT);
1775
1776         LASSERT(osd_invariant(obj));
1777         LASSERT(obj->oo_sa_hdl != NULL);
1778
1779         oh = container_of0(handle, struct osd_thandle, ot_super);
1780
1781         write_lock(&obj->oo_attr_lock);
1782         nlink = ++obj->oo_attr.la_nlink;
1783         write_unlock(&obj->oo_attr_lock);
1784
1785         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1786
1787 out:
1788         up_read(&obj->oo_guard);
1789         RETURN(rc);
1790 }
1791
1792 static int osd_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
1793                                struct thandle *handle)
1794 {
1795         return osd_declare_attr_set(env, dt, NULL, handle);
1796 }
1797
1798 /*
1799  * Concurrency: @dt is write locked.
1800  */
1801 static int osd_ref_del(const struct lu_env *env, struct dt_object *dt,
1802                        struct thandle *handle)
1803 {
1804         struct osd_object       *obj = osd_dt_obj(dt);
1805         struct osd_thandle      *oh;
1806         struct osd_device       *osd = osd_obj2dev(obj);
1807         uint64_t                 nlink;
1808         int                      rc;
1809
1810         ENTRY;
1811
1812         down_read(&obj->oo_guard);
1813
1814         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1815                 GOTO(out, rc = -ENOENT);
1816
1817         LASSERT(osd_invariant(obj));
1818         LASSERT(obj->oo_sa_hdl != NULL);
1819
1820         oh = container_of0(handle, struct osd_thandle, ot_super);
1821         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1822
1823         write_lock(&obj->oo_attr_lock);
1824         nlink = --obj->oo_attr.la_nlink;
1825         write_unlock(&obj->oo_attr_lock);
1826
1827         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1828
1829 out:
1830         up_read(&obj->oo_guard);
1831         RETURN(rc);
1832 }
1833
1834 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
1835                            __u64 start, __u64 end)
1836 {
1837         struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
1838         ENTRY;
1839
1840         /* XXX: no other option than syncing the whole filesystem until we
1841          * support ZIL.  If the object tracked the txg that it was last
1842          * modified in, it could pass that txg here instead of "0".  Maybe
1843          * the changes are already committed, so no wait is needed at all? */
1844         if (!osd->od_dt_dev.dd_rdonly)
1845                 txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
1846
1847         RETURN(0);
1848 }
1849
1850 static int osd_invalidate(const struct lu_env *env, struct dt_object *dt)
1851 {
1852         return 0;
1853 }
1854
1855 static struct dt_object_operations osd_obj_ops = {
1856         .do_read_lock           = osd_read_lock,
1857         .do_write_lock          = osd_write_lock,
1858         .do_read_unlock         = osd_read_unlock,
1859         .do_write_unlock        = osd_write_unlock,
1860         .do_write_locked        = osd_write_locked,
1861         .do_attr_get            = osd_attr_get,
1862         .do_declare_attr_set    = osd_declare_attr_set,
1863         .do_attr_set            = osd_attr_set,
1864         .do_ah_init             = osd_ah_init,
1865         .do_declare_create      = osd_declare_create,
1866         .do_create              = osd_create,
1867         .do_declare_destroy     = osd_declare_destroy,
1868         .do_destroy             = osd_destroy,
1869         .do_index_try           = osd_index_try,
1870         .do_declare_ref_add     = osd_declare_ref_add,
1871         .do_ref_add             = osd_ref_add,
1872         .do_declare_ref_del     = osd_declare_ref_del,
1873         .do_ref_del             = osd_ref_del,
1874         .do_xattr_get           = osd_xattr_get,
1875         .do_declare_xattr_set   = osd_declare_xattr_set,
1876         .do_xattr_set           = osd_xattr_set,
1877         .do_declare_xattr_del   = osd_declare_xattr_del,
1878         .do_xattr_del           = osd_xattr_del,
1879         .do_xattr_list          = osd_xattr_list,
1880         .do_object_sync         = osd_object_sync,
1881         .do_invalidate          = osd_invalidate,
1882 };
1883
1884 static struct lu_object_operations osd_lu_obj_ops = {
1885         .loo_object_init        = osd_object_init,
1886         .loo_object_delete      = osd_object_delete,
1887         .loo_object_release     = osd_object_release,
1888         .loo_object_free        = osd_object_free,
1889         .loo_object_print       = osd_object_print,
1890         .loo_object_invariant   = osd_object_invariant,
1891 };
1892
1893 static int osd_otable_it_attr_get(const struct lu_env *env,
1894                                 struct dt_object *dt,
1895                                 struct lu_attr *attr)
1896 {
1897         attr->la_valid = 0;
1898         return 0;
1899 }
1900
1901 static struct dt_object_operations osd_obj_otable_it_ops = {
1902         .do_attr_get            = osd_otable_it_attr_get,
1903         .do_index_try           = osd_index_try,
1904 };