Whamcloud - gitweb
Revert "LU-5152 quota: enforce block quota for chgrp"
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd-zfs/osd_object.c
33  *
34  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
35  * Author: Mike Pershin <tappro@whamcloud.com>
36  * Author: Johann Lombardi <johann@whamcloud.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_OSD
40
41 #include <lustre_ver.h>
42 #include <libcfs/libcfs.h>
43 #include <obd_support.h>
44 #include <lustre_net.h>
45 #include <obd.h>
46 #include <obd_class.h>
47 #include <lustre_disk.h>
48 #include <lustre_fid.h>
49
50 #include "osd_internal.h"
51
52 #include <sys/dnode.h>
53 #include <sys/dbuf.h>
54 #include <sys/spa.h>
55 #include <sys/stat.h>
56 #include <sys/zap.h>
57 #include <sys/spa_impl.h>
58 #include <sys/zfs_znode.h>
59 #include <sys/dmu_tx.h>
60 #include <sys/dmu_objset.h>
61 #include <sys/dsl_prop.h>
62 #include <sys/sa_impl.h>
63 #include <sys/txg.h>
64
65 char *osd_obj_tag = "osd_object";
66 static int osd_object_sync_delay_us = -1;
67
68 static struct dt_object_operations osd_obj_ops;
69 static struct lu_object_operations osd_lu_obj_ops;
70 extern struct dt_body_operations osd_body_ops;
71 static struct dt_object_operations osd_obj_otable_it_ops;
72
73 extern struct kmem_cache *osd_object_kmem;
74
75 static void
76 osd_object_sa_fini(struct osd_object *obj)
77 {
78         if (obj->oo_sa_hdl) {
79                 sa_handle_destroy(obj->oo_sa_hdl);
80                 obj->oo_sa_hdl = NULL;
81         }
82 }
83
84 static int
85 osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
86 {
87         int rc;
88
89         LASSERT(obj->oo_sa_hdl == NULL);
90         LASSERT(obj->oo_dn != NULL);
91
92         rc = osd_sa_handle_get(obj);
93         if (rc)
94                 return rc;
95
96         /* Cache the xattr object id, valid for the life of the object */
97         rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_XATTR(o), &obj->oo_xattr, 8);
98         if (rc == -ENOENT) {
99                 obj->oo_xattr = ZFS_NO_OBJECT;
100                 rc = 0;
101         } else if (rc) {
102                 osd_object_sa_fini(obj);
103         }
104
105         return rc;
106 }
107
108 /*
109  * Add object to list of dirty objects in tx handle.
110  */
111 void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
112 {
113         if (!list_empty(&obj->oo_sa_linkage))
114                 return;
115
116         write_lock(&obj->oo_attr_lock);
117         if (likely(list_empty(&obj->oo_sa_linkage)))
118                 list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
119         write_unlock(&obj->oo_attr_lock);
120 }
121
122 /*
123  * Release spill block dbuf hold for all dirty SAs.
124  */
125 void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
126 {
127         struct osd_object *obj;
128
129         while (!list_empty(&oh->ot_sa_list)) {
130                 obj = list_entry(oh->ot_sa_list.next,
131                                  struct osd_object, oo_sa_linkage);
132                 write_lock(&obj->oo_attr_lock);
133                 list_del_init(&obj->oo_sa_linkage);
134                 write_unlock(&obj->oo_attr_lock);
135                 if (obj->oo_late_xattr) {
136                         /*
137                          * take oo_guard to protect oo_sa_xattr buffer
138                          * from concurrent update by osd_xattr_set()
139                          */
140                         LASSERT(oh->ot_assigned != 0);
141                         down_write(&obj->oo_guard);
142                         if (obj->oo_late_attr_set)
143                                 __osd_sa_attr_init(env, obj, oh);
144                         else if (obj->oo_late_xattr)
145                                 __osd_sa_xattr_update(env, obj, oh);
146                         up_write(&obj->oo_guard);
147                 }
148                 sa_spill_rele(obj->oo_sa_hdl);
149         }
150 }
151
152 /*
153  * Update the SA and add the object to the dirty list.
154  */
155 int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
156                          void *buf, uint32_t buflen, struct osd_thandle *oh)
157 {
158         int rc;
159
160         LASSERT(obj->oo_sa_hdl != NULL);
161         LASSERT(oh->ot_tx != NULL);
162
163         rc = -sa_update(obj->oo_sa_hdl, type, buf, buflen, oh->ot_tx);
164         osd_object_sa_dirty_add(obj, oh);
165
166         return rc;
167 }
168
169 /*
170  * Bulk update the SA and add the object to the dirty list.
171  */
172 static int
173 osd_object_sa_bulk_update(struct osd_object *obj, sa_bulk_attr_t *attrs,
174                           int count, struct osd_thandle *oh)
175 {
176         int rc;
177
178         LASSERT(obj->oo_sa_hdl != NULL);
179         LASSERT(oh->ot_tx != NULL);
180
181         rc = -sa_bulk_update(obj->oo_sa_hdl, attrs, count, oh->ot_tx);
182         osd_object_sa_dirty_add(obj, oh);
183
184         return rc;
185 }
186
187 /*
188  * Retrieve the attributes of a DMU object
189  */
190 static int __osd_object_attr_get(const struct lu_env *env, struct osd_device *o,
191                                  struct osd_object *obj, struct lu_attr *la)
192 {
193         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
194         sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
195         int              cnt = 0;
196         int              rc;
197         ENTRY;
198
199         LASSERT(obj->oo_dn != NULL);
200
201         la->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | LA_TYPE |
202                         LA_SIZE | LA_UID | LA_GID | LA_FLAGS | LA_NLINK;
203
204         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(o), NULL, osa->atime, 16);
205         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(o), NULL, osa->mtime, 16);
206         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(o), NULL, osa->ctime, 16);
207         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(o), NULL, &osa->mode, 8);
208         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(o), NULL, &osa->size, 8);
209         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(o), NULL, &osa->nlink, 8);
210         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(o), NULL, &osa->uid, 8);
211         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(o), NULL, &osa->gid, 8);
212         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(o), NULL, &osa->flags, 8);
213         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
214
215         rc = -sa_bulk_lookup(obj->oo_sa_hdl, bulk, cnt);
216         if (rc)
217                 GOTO(out_sa, rc);
218
219 #ifdef ZFS_PROJINHERIT
220         if (o->od_projectused_dn && osa->flags & ZFS_PROJID) {
221                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_PROJID(o),
222                                 &osa->projid, 8);
223                 if (rc)
224                         GOTO(out_sa, rc);
225
226                 la->la_projid = osa->projid;
227                 la->la_valid |= LA_PROJID;
228                 obj->oo_with_projid = 1;
229         } else {
230                 la->la_projid = ZFS_DEFAULT_PROJID;
231                 la->la_valid &= ~LA_PROJID;
232         }
233 #else
234         la->la_projid = 0;
235         la->la_valid &= ~LA_PROJID;
236 #endif
237
238         la->la_atime = osa->atime[0];
239         la->la_mtime = osa->mtime[0];
240         la->la_ctime = osa->ctime[0];
241         la->la_mode = osa->mode;
242         la->la_uid = osa->uid;
243         la->la_gid = osa->gid;
244         la->la_nlink = osa->nlink;
245         la->la_flags = attrs_zfs2fs(osa->flags);
246         la->la_size = osa->size;
247
248         /* Try to get extra flag from LMA. Right now, only LMAI_ORPHAN
249          * flags is stored in LMA, and it is only for orphan directory */
250         if (S_ISDIR(la->la_mode) && dt_object_exists(&obj->oo_dt)) {
251                 struct osd_thread_info *info = osd_oti_get(env);
252                 struct lustre_mdt_attrs *lma;
253                 struct lu_buf buf;
254
255                 lma = (struct lustre_mdt_attrs *)info->oti_buf;
256                 buf.lb_buf = lma;
257                 buf.lb_len = sizeof(info->oti_buf);
258                 rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
259                 if (rc > 0) {
260                         rc = 0;
261                         lma->lma_incompat = le32_to_cpu(lma->lma_incompat);
262                         obj->oo_lma_flags =
263                                 lma_to_lustre_flags(lma->lma_incompat);
264
265                 } else if (rc == -ENODATA) {
266                         rc = 0;
267                 }
268         }
269
270         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
271                 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
272                 if (rc)
273                         GOTO(out_sa, rc);
274                 la->la_rdev = osa->rdev;
275                 la->la_valid |= LA_RDEV;
276         }
277 out_sa:
278
279         RETURN(rc);
280 }
281
282 int __osd_obj2dnode(objset_t *os, uint64_t oid, dnode_t **dnp)
283 {
284         dmu_buf_t *db;
285         dmu_buf_impl_t *dbi;
286         int rc;
287
288         rc = -dmu_bonus_hold(os, oid, osd_obj_tag, &db);
289         if (rc)
290                 return rc;
291
292         dbi = (dmu_buf_impl_t *)db;
293         DB_DNODE_ENTER(dbi);
294         *dnp = DB_DNODE(dbi);
295         LASSERT(*dnp != NULL);
296
297         return 0;
298 }
299
300 /*
301  * Concurrency: no concurrent access is possible that early in object
302  * life-cycle.
303  */
304 struct lu_object *osd_object_alloc(const struct lu_env *env,
305                                    const struct lu_object_header *hdr,
306                                    struct lu_device *d)
307 {
308         struct osd_object *mo;
309
310         OBD_SLAB_ALLOC_PTR_GFP(mo, osd_object_kmem, GFP_NOFS);
311         if (mo != NULL) {
312                 struct lu_object *l;
313
314                 l = &mo->oo_dt.do_lu;
315                 dt_object_init(&mo->oo_dt, NULL, d);
316                 mo->oo_dt.do_ops = &osd_obj_ops;
317                 l->lo_ops = &osd_lu_obj_ops;
318                 INIT_LIST_HEAD(&mo->oo_sa_linkage);
319                 INIT_LIST_HEAD(&mo->oo_unlinked_linkage);
320                 init_rwsem(&mo->oo_sem);
321                 init_rwsem(&mo->oo_guard);
322                 rwlock_init(&mo->oo_attr_lock);
323                 mo->oo_destroy = OSD_DESTROY_NONE;
324                 return l;
325         } else {
326                 return NULL;
327         }
328 }
329
330 /*
331  * Concurrency: shouldn't matter.
332  */
333 static int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
334 {
335         struct osd_device       *osd = osd_obj2dev(obj);
336         const struct lu_fid     *fid = lu_object_fid(&obj->oo_dt.do_lu);
337         int                      rc = 0;
338         ENTRY;
339
340         if (obj->oo_dn == NULL)
341                 RETURN(0);
342
343         /* object exist */
344
345         rc = osd_object_sa_init(obj, osd);
346         if (rc)
347                 RETURN(rc);
348
349         /* cache attrs in object */
350         rc = __osd_object_attr_get(env, osd, obj, &obj->oo_attr);
351         if (rc)
352                 RETURN(rc);
353
354         if (likely(!fid_is_acct(fid)))
355                 /* no body operations for accounting objects */
356                 obj->oo_dt.do_body_ops = &osd_body_ops;
357
358         /*
359          * initialize object before marking it existing
360          */
361         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
362
363         smp_mb();
364         obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
365
366         RETURN(0);
367 }
368
369 static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
370 {
371         struct osd_thread_info  *info = osd_oti_get(env);
372         struct lu_buf           buf;
373         int                     rc;
374         struct lustre_mdt_attrs *lma;
375         ENTRY;
376
377         CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
378         lma = (struct lustre_mdt_attrs *)info->oti_buf;
379         buf.lb_buf = lma;
380         buf.lb_len = sizeof(info->oti_buf);
381
382         rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA);
383         if (rc > 0) {
384                 rc = 0;
385                 lustre_lma_swab(lma);
386                 if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
387                              CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
388                         CWARN("%s: unsupported incompat LMA feature(s) %#x for "
389                               "fid = "DFID"\n", osd_obj2dev(obj)->od_svname,
390                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
391                               PFID(lu_object_fid(&obj->oo_dt.do_lu)));
392                         rc = -EOPNOTSUPP;
393                 }
394         } else if (rc == -ENODATA) {
395                 /* haven't initialize LMA xattr */
396                 rc = 0;
397         }
398
399         RETURN(rc);
400 }
401
402 /**
403  * Helper function to retrieve DMU object id from fid for accounting object
404  */
405 static dnode_t *osd_quota_fid2dmu(const struct osd_device *osd,
406                                   const struct lu_fid *fid)
407 {
408         dnode_t *dn = NULL;
409
410         LASSERT(fid_is_acct(fid));
411
412         switch (fid_oid(fid)) {
413         case ACCT_USER_OID:
414                 dn = osd->od_userused_dn;
415                 break;
416         case ACCT_GROUP_OID:
417                 dn = osd->od_groupused_dn;
418                 break;
419 #ifdef ZFS_PROJINHERIT
420         case ACCT_PROJECT_OID:
421                 dn = osd->od_projectused_dn;
422                 break;
423 #endif
424         default:
425                 break;
426         }
427
428         return dn;
429 }
430
431 /*
432  * Concurrency: no concurrent access is possible that early in object
433  * life-cycle.
434  */
435 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
436                            const struct lu_object_conf *conf)
437 {
438         struct osd_object *obj = osd_obj(l);
439         struct osd_device *osd = osd_obj2dev(obj);
440         const struct lu_fid *fid = lu_object_fid(l);
441         uint64_t oid;
442         int rc = 0;
443         ENTRY;
444
445         LASSERT(osd_invariant(obj));
446
447         if (fid_is_otable_it(&l->lo_header->loh_fid)) {
448                 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
449                 l->lo_header->loh_attr |= LOHA_EXISTS;
450                 RETURN(0);
451         }
452
453         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
454                 GOTO(out, rc = 0);
455
456         if (unlikely(fid_is_acct(fid))) {
457                 obj->oo_dn = osd_quota_fid2dmu(osd, fid);
458                 if (obj->oo_dn) {
459                         obj->oo_dt.do_index_ops = &osd_acct_index_ops;
460                         l->lo_header->loh_attr |= LOHA_EXISTS;
461                 }
462
463                 GOTO(out, rc = 0);
464         }
465
466         rc = osd_fid_lookup(env, osd, fid, &oid);
467         if (rc == 0) {
468                 LASSERT(obj->oo_dn == NULL);
469                 rc = __osd_obj2dnode(osd->od_os, oid, &obj->oo_dn);
470                 /* EEXIST will be returned if object is being deleted in ZFS */
471                 if (rc == -EEXIST) {
472                         rc = 0;
473                         GOTO(out, rc);
474                 }
475                 if (rc != 0) {
476                         CERROR("%s: lookup "DFID"/%#llx failed: rc = %d\n",
477                                osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
478                         GOTO(out, rc);
479                 }
480                 LASSERT(obj->oo_dn);
481                 rc = osd_object_init0(env, obj);
482                 if (rc != 0)
483                         GOTO(out, rc);
484
485                 rc = osd_check_lma(env, obj);
486                 if (rc != 0)
487                         GOTO(out, rc);
488         } else if (rc == -ENOENT) {
489                 rc = 0;
490         }
491         LASSERT(osd_invariant(obj));
492 out:
493         RETURN(rc);
494 }
495
496 /*
497  * Concurrency: no concurrent access is possible that late in object
498  * life-cycle.
499  */
500 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
501 {
502         struct osd_object *obj = osd_obj(l);
503
504         LASSERT(osd_invariant(obj));
505
506         dt_object_fini(&obj->oo_dt);
507         OBD_SLAB_FREE_PTR(obj, osd_object_kmem);
508 }
509
510 static int
511 osd_object_unlinked_add(struct osd_object *obj, struct osd_thandle *oh)
512 {
513         int rc = -EBUSY;
514
515         LASSERT(obj->oo_destroy == OSD_DESTROY_ASYNC);
516
517         /* the object is supposed to be exclusively locked by
518          * the caller (osd_destroy()), while the transaction
519          * (oh) is per-thread and not shared */
520         if (likely(list_empty(&obj->oo_unlinked_linkage))) {
521                 list_add(&obj->oo_unlinked_linkage, &oh->ot_unlinked_list);
522                 rc = 0;
523         }
524
525         return rc;
526 }
527
528 /* Default to max data size covered by a level-1 indirect block */
529 static unsigned long osd_sync_destroy_max_size =
530         1UL << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT + SPA_MAXBLOCKSHIFT);
531 module_param(osd_sync_destroy_max_size, ulong, 0444);
532 MODULE_PARM_DESC(osd_sync_destroy_max_size, "Maximum object size to use synchronous destroy.");
533
534 static inline void
535 osd_object_set_destroy_type(struct osd_object *obj)
536 {
537         /*
538          * Lock-less OST_WRITE can race with OST_DESTROY, so set destroy type
539          * only once and use it consistently thereafter.
540          */
541         down_write(&obj->oo_guard);
542         if (obj->oo_destroy == OSD_DESTROY_NONE) {
543                 if (obj->oo_attr.la_size <= osd_sync_destroy_max_size)
544                         obj->oo_destroy = OSD_DESTROY_SYNC;
545                 else /* Larger objects are destroyed asynchronously */
546                         obj->oo_destroy = OSD_DESTROY_ASYNC;
547         }
548         up_write(&obj->oo_guard);
549 }
550
551 static int osd_declare_destroy(const struct lu_env *env, struct dt_object *dt,
552                                struct thandle *th)
553 {
554         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
555         struct osd_object       *obj = osd_dt_obj(dt);
556         struct osd_device       *osd = osd_obj2dev(obj);
557         struct osd_thandle      *oh;
558         dnode_t *dn;
559         int                      rc;
560         uint64_t                 zapid;
561         ENTRY;
562
563         LASSERT(th != NULL);
564         LASSERT(dt_object_exists(dt));
565
566         oh = container_of0(th, struct osd_thandle, ot_super);
567         LASSERT(oh->ot_tx != NULL);
568
569         dmu_tx_mark_netfree(oh->ot_tx);
570
571         /* declare that we'll remove object from fid-dnode mapping */
572         zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0, &dn);
573         osd_tx_hold_zap(oh->ot_tx, zapid, dn, FALSE, NULL);
574
575         osd_declare_xattrs_destroy(env, obj, oh);
576
577         /* one less inode */
578         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
579                                obj->oo_attr.la_gid, obj->oo_attr.la_projid,
580                                -1, oh, NULL, OSD_QID_INODE);
581         if (rc)
582                 RETURN(rc);
583
584         /* data to be truncated */
585         rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
586                                obj->oo_attr.la_gid, obj->oo_attr.la_projid,
587                                0, oh, NULL, OSD_QID_BLK);
588         if (rc)
589                 RETURN(rc);
590
591         osd_object_set_destroy_type(obj);
592         if (obj->oo_destroy == OSD_DESTROY_SYNC)
593                 dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object,
594                                  0, DMU_OBJECT_END);
595         else
596                 osd_tx_hold_zap(oh->ot_tx, osd->od_unlinked->dn_object,
597                                 osd->od_unlinked, TRUE, NULL);
598
599         /* will help to find FID->ino when this object is being
600          * added to PENDING/ */
601         osd_idc_find_and_init(env, osd, obj);
602
603         RETURN(0);
604 }
605
606 static int osd_destroy(const struct lu_env *env, struct dt_object *dt,
607                        struct thandle *th)
608 {
609         struct osd_thread_info  *info = osd_oti_get(env);
610         char                    *buf = info->oti_str;
611         struct osd_object       *obj = osd_dt_obj(dt);
612         struct osd_device       *osd = osd_obj2dev(obj);
613         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
614         struct osd_thandle      *oh;
615         int                      rc;
616         uint64_t                 oid, zapid;
617         dnode_t *zdn;
618         ENTRY;
619
620         down_write(&obj->oo_guard);
621
622         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
623                 GOTO(out, rc = -ENOENT);
624
625         LASSERT(obj->oo_dn != NULL);
626
627         oh = container_of0(th, struct osd_thandle, ot_super);
628         LASSERT(oh != NULL);
629         LASSERT(oh->ot_tx != NULL);
630
631         /* remove obj ref from index dir (it depends) */
632         zapid = osd_get_name_n_idx(env, osd, fid, buf,
633                                    sizeof(info->oti_str), &zdn);
634         rc = osd_zap_remove(osd, zapid, zdn, buf, oh->ot_tx);
635         if (rc) {
636                 CERROR("%s: zap_remove(%s) failed: rc = %d\n",
637                        osd->od_svname, buf, rc);
638                 GOTO(out, rc);
639         }
640
641         rc = osd_xattrs_destroy(env, obj, oh);
642         if (rc) {
643                 CERROR("%s: cannot destroy xattrs for %s: rc = %d\n",
644                        osd->od_svname, buf, rc);
645                 GOTO(out, rc);
646         }
647
648         oid = obj->oo_dn->dn_object;
649         if (unlikely(obj->oo_destroy == OSD_DESTROY_NONE)) {
650                 /* this may happen if the destroy wasn't declared
651                  * e.g. when the object is created and then destroyed
652                  * in the same transaction - we don't need additional
653                  * space for destroy specifically */
654                 LASSERT(obj->oo_attr.la_size <= osd_sync_destroy_max_size);
655                 rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
656                 if (rc)
657                         CERROR("%s: failed to free %s %llu: rc = %d\n",
658                                osd->od_svname, buf, oid, rc);
659         } else if (obj->oo_destroy == OSD_DESTROY_SYNC) {
660                 rc = -dmu_object_free(osd->od_os, oid, oh->ot_tx);
661                 if (rc)
662                         CERROR("%s: failed to free %s %llu: rc = %d\n",
663                                osd->od_svname, buf, oid, rc);
664         } else { /* asynchronous destroy */
665                 char *key = info->oti_key;
666
667                 rc = osd_object_unlinked_add(obj, oh);
668                 if (rc)
669                         GOTO(out, rc);
670
671                 snprintf(key, sizeof(info->oti_key), "%llx", oid);
672                 rc = osd_zap_add(osd, osd->od_unlinked->dn_object,
673                                  osd->od_unlinked, key, 8, 1, &oid, oh->ot_tx);
674                 if (rc)
675                         CERROR("%s: zap_add_int() failed %s %llu: rc = %d\n",
676                                osd->od_svname, buf, oid, rc);
677         }
678
679 out:
680         /* not needed in the cache anymore */
681         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
682         if (rc == 0)
683                 obj->oo_destroyed = 1;
684         up_write(&obj->oo_guard);
685         RETURN (0);
686 }
687
688 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
689 {
690         struct osd_object *obj = osd_obj(l);
691         const struct lu_fid *fid = lu_object_fid(l);
692
693         if (obj->oo_dn) {
694                 if (likely(!fid_is_acct(fid))) {
695                         osd_object_sa_fini(obj);
696                         if (obj->oo_sa_xattr) {
697                                 nvlist_free(obj->oo_sa_xattr);
698                                 obj->oo_sa_xattr = NULL;
699                         }
700                         osd_dnode_rele(obj->oo_dn);
701                         list_del(&obj->oo_sa_linkage);
702                 }
703                 obj->oo_dn = NULL;
704         }
705 }
706
707 /*
708  * Concurrency: ->loo_object_release() is called under site spin-lock.
709  */
710 static void osd_object_release(const struct lu_env *env,
711                                struct lu_object *l)
712 {
713 }
714
715 /*
716  * Concurrency: shouldn't matter.
717  */
718 static int osd_object_print(const struct lu_env *env, void *cookie,
719                             lu_printer_t p, const struct lu_object *l)
720 {
721         struct osd_object *o = osd_obj(l);
722
723         return (*p)(env, cookie, LUSTRE_OSD_ZFS_NAME"-object@%p", o);
724 }
725
726 static void osd_read_lock(const struct lu_env *env, struct dt_object *dt,
727                           unsigned role)
728 {
729         struct osd_object *obj = osd_dt_obj(dt);
730
731         LASSERT(osd_invariant(obj));
732
733         down_read_nested(&obj->oo_sem, role);
734 }
735
736 static void osd_write_lock(const struct lu_env *env, struct dt_object *dt,
737                            unsigned role)
738 {
739         struct osd_object *obj = osd_dt_obj(dt);
740
741         LASSERT(osd_invariant(obj));
742
743         down_write_nested(&obj->oo_sem, role);
744 }
745
746 static void osd_read_unlock(const struct lu_env *env, struct dt_object *dt)
747 {
748         struct osd_object *obj = osd_dt_obj(dt);
749
750         LASSERT(osd_invariant(obj));
751         up_read(&obj->oo_sem);
752 }
753
754 static void osd_write_unlock(const struct lu_env *env, struct dt_object *dt)
755 {
756         struct osd_object *obj = osd_dt_obj(dt);
757
758         LASSERT(osd_invariant(obj));
759         up_write(&obj->oo_sem);
760 }
761
762 static int osd_write_locked(const struct lu_env *env, struct dt_object *dt)
763 {
764         struct osd_object *obj = osd_dt_obj(dt);
765         int rc = 1;
766
767         LASSERT(osd_invariant(obj));
768
769         if (down_write_trylock(&obj->oo_sem)) {
770                 rc = 0;
771                 up_write(&obj->oo_sem);
772         }
773         return rc;
774 }
775
776 static int osd_attr_get(const struct lu_env *env,
777                         struct dt_object *dt,
778                         struct lu_attr *attr)
779 {
780         struct osd_object       *obj = osd_dt_obj(dt);
781         uint64_t                 blocks;
782         uint32_t                 blksize;
783         int                      rc = 0;
784
785         down_read(&obj->oo_guard);
786
787         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
788                 GOTO(out, rc = -ENOENT);
789
790         if (unlikely(fid_is_acct(lu_object_fid(&dt->do_lu))))
791                 GOTO(out, rc = 0);
792
793         LASSERT(osd_invariant(obj));
794         LASSERT(obj->oo_dn);
795
796         read_lock(&obj->oo_attr_lock);
797         *attr = obj->oo_attr;
798         if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
799                 attr->la_flags |= LUSTRE_ORPHAN_FL;
800         read_unlock(&obj->oo_attr_lock);
801
802         /* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
803          * from within sa_object_size() can block on a mutex, so
804          * we can't call sa_object_size() holding rwlock */
805         sa_object_size(obj->oo_sa_hdl, &blksize, &blocks);
806         /* we do not control size of indices, so always calculate
807          * it from number of blocks reported by DMU */
808         if (S_ISDIR(attr->la_mode))
809                 attr->la_size = 512 * blocks;
810         /* Block size may be not set; suggest maximal I/O transfers. */
811         if (blksize == 0)
812                 blksize = osd_spa_maxblocksize(
813                         dmu_objset_spa(osd_obj2dev(obj)->od_os));
814
815         attr->la_blksize = blksize;
816         attr->la_blocks = blocks;
817         attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
818
819 out:
820         up_read(&obj->oo_guard);
821         return rc;
822 }
823
824 /* Simple wrapper on top of qsd API which implement quota transfer for osd
825  * setattr needs. As a reminder, only the root user can change ownership of
826  * a file, that's why EDQUOT & EINPROGRESS errors are discarded */
827 static inline int qsd_transfer(const struct lu_env *env,
828                                struct qsd_instance *qsd,
829                                struct lquota_trans *trans, int qtype,
830                                __u64 orig_id, __u64 new_id, __u64 bspace,
831                                struct lquota_id_info *qi)
832 {
833         int     rc;
834
835         if (unlikely(qsd == NULL))
836                 return 0;
837
838         LASSERT(qtype >= 0 && qtype < LL_MAXQUOTAS);
839         qi->lqi_type = qtype;
840
841         /* inode accounting */
842         qi->lqi_is_blk = false;
843
844         /* one more inode for the new owner ... */
845         qi->lqi_id.qid_uid = new_id;
846         qi->lqi_space      = 1;
847         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
848         if (rc == -EDQUOT || rc == -EINPROGRESS)
849                 rc = 0;
850         if (rc)
851                 return rc;
852
853         /* and one less inode for the current id */
854         qi->lqi_id.qid_uid = orig_id;;
855         qi->lqi_space      = -1;
856         /* can't get EDQUOT when reducing usage */
857         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
858         if (rc == -EINPROGRESS)
859                 rc = 0;
860         if (rc)
861                 return rc;
862
863         /* block accounting */
864         qi->lqi_is_blk = true;
865
866         /* more blocks for the new owner ... */
867         qi->lqi_id.qid_uid = new_id;
868         qi->lqi_space      = bspace;
869         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
870         if (rc == -EDQUOT || rc == -EINPROGRESS)
871                 rc = 0;
872         if (rc)
873                 return rc;
874
875         /* and finally less blocks for the current owner */
876         qi->lqi_id.qid_uid = orig_id;
877         qi->lqi_space      = -bspace;
878         rc = qsd_op_begin(env, qsd, trans, qi, NULL);
879         /* can't get EDQUOT when reducing usage */
880         if (rc == -EINPROGRESS)
881                 rc = 0;
882         return rc;
883 }
884
885 static int osd_declare_attr_set(const struct lu_env *env,
886                                 struct dt_object *dt,
887                                 const struct lu_attr *attr,
888                                 struct thandle *handle)
889 {
890         struct osd_thread_info  *info = osd_oti_get(env);
891         struct osd_object       *obj = osd_dt_obj(dt);
892         struct osd_device       *osd = osd_obj2dev(obj);
893         dmu_tx_hold_t           *txh;
894         struct osd_thandle      *oh;
895         uint64_t                 bspace;
896         uint32_t                 blksize;
897         int                      rc = 0;
898         bool                     found;
899         ENTRY;
900
901
902         LASSERT(handle != NULL);
903         LASSERT(osd_invariant(obj));
904
905         oh = container_of0(handle, struct osd_thandle, ot_super);
906
907         down_read(&obj->oo_guard);
908         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
909                 GOTO(out, rc = 0);
910
911         LASSERT(obj->oo_sa_hdl != NULL);
912         LASSERT(oh->ot_tx != NULL);
913         /* regular attributes are part of the bonus buffer */
914         /* let's check whether this object is already part of
915          * transaction.. */
916         found = false;
917         for (txh = list_head(&oh->ot_tx->tx_holds); txh;
918              txh = list_next(&oh->ot_tx->tx_holds, txh)) {
919                 if (txh->txh_dnode == NULL)
920                         continue;
921                 if (txh->txh_dnode->dn_object != obj->oo_dn->dn_object)
922                         continue;
923                 /* this object is part of the transaction already
924                  * we don't need to declare bonus again */
925                 found = true;
926                 break;
927         }
928         if (!found)
929                 dmu_tx_hold_bonus(oh->ot_tx, obj->oo_dn->dn_object);
930         if (oh->ot_tx->tx_err != 0)
931                 GOTO(out, rc = -oh->ot_tx->tx_err);
932
933         if (attr && attr->la_valid & LA_FLAGS) {
934                 /* LMA is usually a part of bonus, no need to declare
935                  * anything else */
936         }
937
938         if (attr && (attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) {
939                 sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
940                 bspace = toqb(bspace * blksize);
941         }
942
943         if (attr && attr->la_valid & LA_UID) {
944                 /* quota enforcement for user */
945                 if (attr->la_uid != obj->oo_attr.la_uid) {
946                         rc = qsd_transfer(env, osd->od_quota_slave,
947                                           &oh->ot_quota_trans, USRQUOTA,
948                                           obj->oo_attr.la_uid, attr->la_uid,
949                                           bspace, &info->oti_qi);
950                         if (rc)
951                                 GOTO(out, rc);
952                 }
953         }
954         if (attr && attr->la_valid & LA_GID) {
955                 /* quota enforcement for group */
956                 if (attr->la_gid != obj->oo_attr.la_gid) {
957                         rc = qsd_transfer(env, osd->od_quota_slave,
958                                           &oh->ot_quota_trans, GRPQUOTA,
959                                           obj->oo_attr.la_gid, attr->la_gid,
960                                           bspace, &info->oti_qi);
961                         if (rc)
962                                 GOTO(out, rc);
963                 }
964         }
965 #ifdef ZFS_PROJINHERIT
966         if (attr && attr->la_valid & LA_PROJID) {
967                 if (!osd->od_projectused_dn)
968                         GOTO(out, rc = -EOPNOTSUPP);
969
970                 /* Usually, if project quota is upgradable for the device,
971                  * then the upgrade will be done before or when mount the
972                  * device. So when we come here, this project should have
973                  * project ID attribute already (that is zero by default).
974                  * Otherwise, there was something wrong during the former
975                  * upgrade, let's return failure to report that.
976                  *
977                  * Please note that, different from other attributes, you
978                  * can NOT simply set the project ID attribute under such
979                  * case, because adding (NOT change) project ID attribute
980                  * needs to change the object's attribute layout to match
981                  * zfs backend quota accounting requirement. */
982                 if (unlikely(!obj->oo_with_projid))
983                         GOTO(out, rc = -ENXIO);
984
985                 /* quota enforcement for project */
986                 if (attr->la_projid != obj->oo_attr.la_projid) {
987                         rc = qsd_transfer(env, osd->od_quota_slave,
988                                           &oh->ot_quota_trans, PRJQUOTA,
989                                           obj->oo_attr.la_projid,
990                                           attr->la_projid, bspace,
991                                           &info->oti_qi);
992                         if (rc)
993                                 GOTO(out, rc);
994                 }
995         }
996 #endif
997 out:
998         up_read(&obj->oo_guard);
999         RETURN(rc);
1000 }
1001
1002 /*
1003  * Set the attributes of an object
1004  *
1005  * The transaction passed to this routine must have
1006  * dmu_tx_hold_bonus(tx, oid) called and then assigned
1007  * to a transaction group.
1008  */
1009 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
1010                         const struct lu_attr *la, struct thandle *handle)
1011 {
1012         struct osd_thread_info  *info = osd_oti_get(env);
1013         sa_bulk_attr_t          *bulk = osd_oti_get(env)->oti_attr_bulk;
1014         struct osd_object       *obj = osd_dt_obj(dt);
1015         struct osd_device       *osd = osd_obj2dev(obj);
1016         struct osd_thandle      *oh;
1017         struct osa_attr         *osa = &info->oti_osa;
1018         __u64                    valid = la->la_valid;
1019         int                      cnt;
1020         int                      rc = 0;
1021
1022         ENTRY;
1023
1024         down_read(&obj->oo_guard);
1025         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1026                 GOTO(out, rc = -ENOENT);
1027
1028         LASSERT(handle != NULL);
1029         LASSERT(osd_invariant(obj));
1030         LASSERT(obj->oo_sa_hdl);
1031
1032         oh = container_of0(handle, struct osd_thandle, ot_super);
1033         /* Assert that the transaction has been assigned to a
1034            transaction group. */
1035         LASSERT(oh->ot_tx->tx_txg != 0);
1036
1037         /* Only allow set size for regular file */
1038         if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
1039                 valid &= ~(LA_SIZE | LA_BLOCKS);
1040
1041         if (valid & LA_CTIME && la->la_ctime == obj->oo_attr.la_ctime)
1042                 valid &= ~LA_CTIME;
1043
1044         if (valid & LA_MTIME && la->la_mtime == obj->oo_attr.la_mtime)
1045                 valid &= ~LA_MTIME;
1046
1047         if (valid & LA_ATIME && la->la_atime == obj->oo_attr.la_atime)
1048                 valid &= ~LA_ATIME;
1049
1050         if (valid == 0)
1051                 GOTO(out, rc = 0);
1052
1053         if (valid & LA_FLAGS) {
1054                 struct lustre_mdt_attrs *lma;
1055                 struct lu_buf buf;
1056                 int size = 0;
1057
1058                 if (la->la_flags & LUSTRE_LMA_FL_MASKS) {
1059                         CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
1060                         lma = (struct lustre_mdt_attrs *)&info->oti_buf;
1061                         buf.lb_buf = lma;
1062                         buf.lb_len = sizeof(info->oti_buf);
1063
1064                         /* Please NOT call osd_xattr_get() directly, that
1065                          * will cause recursively down_read() on oo_gurad. */
1066                         rc = osd_xattr_get_internal(env, obj, &buf,
1067                                                     XATTR_NAME_LMA, &size);
1068                         if (!rc && unlikely(size < sizeof(*lma)))
1069                                 rc = -EINVAL;
1070                         if (!rc) {
1071                                 lma->lma_incompat =
1072                                         le32_to_cpu(lma->lma_incompat);
1073                                 lma->lma_incompat |=
1074                                         lustre_to_lma_flags(la->la_flags);
1075                                 lma->lma_incompat =
1076                                         cpu_to_le32(lma->lma_incompat);
1077                                 buf.lb_buf = lma;
1078                                 buf.lb_len = sizeof(*lma);
1079                                 rc = osd_xattr_set_internal(env, obj, &buf,
1080                                                             XATTR_NAME_LMA,
1081                                                             LU_XATTR_REPLACE,
1082                                                             oh);
1083                         }
1084                         if (rc < 0) {
1085                                 CWARN("%s: failed to set LMA flags: rc = %d\n",
1086                                        osd->od_svname, rc);
1087                                 GOTO(out, rc);
1088                         }
1089                 }
1090         }
1091
1092         write_lock(&obj->oo_attr_lock);
1093         cnt = 0;
1094
1095         if (valid & LA_PROJID) {
1096 #ifdef ZFS_PROJINHERIT
1097                 /* osd_declare_attr_set() must be called firstly.
1098                  * If osd::od_projectused_dn is not set, then we
1099                  * can not arrive at here. */
1100                 LASSERT(osd->od_projectused_dn);
1101                 LASSERT(obj->oo_with_projid);
1102
1103                 osa->projid = obj->oo_attr.la_projid = la->la_projid;
1104                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PROJID(osd), NULL,
1105                                  &osa->projid, 8);
1106 #else
1107                 valid &= ~LA_PROJID;
1108 #endif
1109         }
1110
1111         if (valid & LA_ATIME) {
1112                 osa->atime[0] = obj->oo_attr.la_atime = la->la_atime;
1113                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL,
1114                                  osa->atime, 16);
1115         }
1116         if (valid & LA_MTIME) {
1117                 osa->mtime[0] = obj->oo_attr.la_mtime = la->la_mtime;
1118                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL,
1119                                  osa->mtime, 16);
1120         }
1121         if (valid & LA_CTIME) {
1122                 osa->ctime[0] = obj->oo_attr.la_ctime = la->la_ctime;
1123                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL,
1124                                  osa->ctime, 16);
1125         }
1126         if (valid & LA_MODE) {
1127                 /* mode is stored along with type, so read it first */
1128                 obj->oo_attr.la_mode = (obj->oo_attr.la_mode & S_IFMT) |
1129                         (la->la_mode & ~S_IFMT);
1130                 osa->mode = obj->oo_attr.la_mode;
1131                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL,
1132                                  &osa->mode, 8);
1133         }
1134         if (valid & LA_SIZE) {
1135                 osa->size = obj->oo_attr.la_size = la->la_size;
1136                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL,
1137                                  &osa->size, 8);
1138         }
1139         if (valid & LA_NLINK) {
1140                 osa->nlink = obj->oo_attr.la_nlink = la->la_nlink;
1141                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL,
1142                                  &osa->nlink, 8);
1143         }
1144         if (valid & LA_RDEV) {
1145                 osa->rdev = obj->oo_attr.la_rdev = la->la_rdev;
1146                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL,
1147                                  &osa->rdev, 8);
1148         }
1149         if (valid & LA_FLAGS) {
1150                 osa->flags = attrs_fs2zfs(la->la_flags);
1151                 /* many flags are not supported by zfs, so ensure a good cached
1152                  * copy */
1153                 obj->oo_attr.la_flags = attrs_zfs2fs(osa->flags);
1154 #ifdef ZFS_PROJINHERIT
1155                 if (obj->oo_with_projid)
1156                         osa->flags |= ZFS_PROJID;
1157 #endif
1158                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL,
1159                                  &osa->flags, 8);
1160         }
1161         if (valid & LA_UID) {
1162                 osa->uid = obj->oo_attr.la_uid = la->la_uid;
1163                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL,
1164                                  &osa->uid, 8);
1165         }
1166         if (valid & LA_GID) {
1167                 osa->gid = obj->oo_attr.la_gid = la->la_gid;
1168                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL,
1169                                  &osa->gid, 8);
1170         }
1171         obj->oo_attr.la_valid |= valid;
1172         write_unlock(&obj->oo_attr_lock);
1173
1174         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
1175         rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
1176
1177 out:
1178         up_read(&obj->oo_guard);
1179         RETURN(rc);
1180 }
1181
1182 /*
1183  * Object creation.
1184  *
1185  * XXX temporary solution.
1186  */
1187
1188 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1189                         struct dt_object *parent, struct dt_object *child,
1190                         umode_t child_mode)
1191 {
1192         LASSERT(ah);
1193
1194         ah->dah_parent = parent;
1195         ah->dah_mode = child_mode;
1196
1197         if (parent != NULL && !dt_object_remote(parent)) {
1198                 /* will help to find FID->ino at dt_insert("..") */
1199                 struct osd_object *pobj = osd_dt_obj(parent);
1200
1201                 osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
1202         }
1203 }
1204
1205 static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
1206                               struct lu_attr *attr,
1207                               struct dt_allocation_hint *hint,
1208                               struct dt_object_format *dof,
1209                               struct thandle *handle)
1210 {
1211         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1212         struct osd_object       *obj = osd_dt_obj(dt);
1213         struct osd_device       *osd = osd_obj2dev(obj);
1214         struct osd_thandle      *oh;
1215         uint64_t                 zapid;
1216         dnode_t                 *dn;
1217         int                      rc, dnode_size;
1218         ENTRY;
1219
1220         LASSERT(dof);
1221
1222         switch (dof->dof_type) {
1223                 case DFT_REGULAR:
1224                 case DFT_SYM:
1225                 case DFT_NODE:
1226                         if (obj->oo_dt.do_body_ops == NULL)
1227                                 obj->oo_dt.do_body_ops = &osd_body_ops;
1228                         break;
1229                 default:
1230                         break;
1231         }
1232
1233         LASSERT(handle != NULL);
1234         oh = container_of0(handle, struct osd_thandle, ot_super);
1235         LASSERT(oh->ot_tx != NULL);
1236
1237         /* this is the minimum set of EAs on every Lustre object */
1238         obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE +
1239                                 sizeof(__u64) + /* VBR VERSION */
1240                                 sizeof(struct lustre_mdt_attrs); /* LMA */
1241         /* reserve 32 bytes for extra stuff like ACLs */
1242         dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32);
1243
1244         switch (dof->dof_type) {
1245                 case DFT_DIR:
1246                         dt->do_index_ops = &osd_dir_ops;
1247                 case DFT_INDEX:
1248                         /* for zap create */
1249                         dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL);
1250                         dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
1251                         break;
1252                 case DFT_REGULAR:
1253                 case DFT_SYM:
1254                 case DFT_NODE:
1255                         /* first, we'll create new object */
1256                         dmu_tx_hold_sa_create(oh->ot_tx, dnode_size);
1257                         break;
1258
1259                 default:
1260                         LBUG();
1261                         break;
1262         }
1263
1264         /* and we'll add it to some mapping */
1265         zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0, &dn);
1266         osd_tx_hold_zap(oh->ot_tx, zapid, dn, TRUE, NULL);
1267
1268         /* will help to find FID->ino mapping at dt_insert() */
1269         osd_idc_find_and_init(env, osd, obj);
1270
1271         rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid,
1272                                attr->la_projid, 1, oh, NULL, OSD_QID_INODE);
1273
1274         RETURN(rc);
1275 }
1276
1277 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
1278                     struct osd_object *obj, sa_handle_t *sa_hdl, dmu_tx_t *tx,
1279                     struct lu_attr *la, uint64_t parent,
1280                     nvlist_t *xattr)
1281 {
1282         sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
1283         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
1284         uint64_t         gen;
1285         uint64_t         crtime[2];
1286         timestruc_t      now;
1287         int              cnt;
1288         int              rc;
1289         char *dxattr = NULL;
1290         size_t sa_size;
1291
1292
1293         LASSERT(sa_hdl);
1294
1295         gen = dmu_tx_get_txg(tx);
1296         gethrestime(&now);
1297         ZFS_TIME_ENCODE(&now, crtime);
1298
1299         osa->atime[0] = la->la_atime;
1300         osa->ctime[0] = la->la_ctime;
1301         osa->mtime[0] = la->la_mtime;
1302         osa->mode = la->la_mode;
1303         osa->uid = la->la_uid;
1304         osa->gid = la->la_gid;
1305         osa->rdev = la->la_rdev;
1306         osa->nlink = la->la_nlink;
1307         if (la->la_valid & LA_FLAGS)
1308                 osa->flags = attrs_fs2zfs(la->la_flags);
1309         else
1310                 osa->flags = 0;
1311         osa->size  = la->la_size;
1312 #ifdef ZFS_PROJINHERIT
1313         if (osd->od_projectused_dn) {
1314                 if (la->la_valid & LA_PROJID)
1315                         osa->projid = la->la_projid;
1316                 else
1317                         osa->projid = ZFS_DEFAULT_PROJID;
1318                 osa->flags |= ZFS_PROJID;
1319                 if (obj)
1320                         obj->oo_with_projid = 1;
1321         } else {
1322                 osa->flags &= ~ZFS_PROJID;
1323         }
1324 #endif
1325
1326         /*
1327          * we need to create all SA below upon object create.
1328          *
1329          * XXX The attribute order matters since the accounting callback relies
1330          * on static offsets (i.e. SA_*_OFFSET, see zfs_space_delta_cb()) to
1331          * look up the UID/GID/PROJID attributes. Moreover, the callback does
1332          * not seem to support the spill block.
1333          * We define attributes in the same order as SA_*_OFFSET in order to
1334          * work around the problem. See ORI-610.
1335          */
1336         cnt = 0;
1337         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8);
1338         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8);
1339         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8);
1340         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8);
1341         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8);
1342         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL, &parent, 8);
1343         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8);
1344         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16);
1345         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16);
1346         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16);
1347         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, crtime, 16);
1348         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8);
1349 #ifdef ZFS_PROJINHERIT
1350         if (osd->od_projectused_dn)
1351                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PROJID(osd), NULL,
1352                                  &osa->projid, 8);
1353 #endif
1354         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
1355         LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
1356
1357         if (xattr) {
1358                 rc = -nvlist_size(xattr, &sa_size, NV_ENCODE_XDR);
1359                 LASSERT(rc == 0);
1360
1361                 dxattr = osd_zio_buf_alloc(sa_size);
1362                 LASSERT(dxattr);
1363
1364                 rc = -nvlist_pack(xattr, &dxattr, &sa_size,
1365                                 NV_ENCODE_XDR, KM_SLEEP);
1366                 LASSERT(rc == 0);
1367
1368                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd),
1369                                 NULL, dxattr, sa_size);
1370         }
1371
1372         rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
1373         if (dxattr)
1374                 osd_zio_buf_free(dxattr, sa_size);
1375
1376         return rc;
1377 }
1378
1379 static int osd_find_new_dnode(const struct lu_env *env, dmu_tx_t *tx,
1380                               uint64_t oid, dnode_t **dnp)
1381 {
1382         dmu_tx_hold_t *txh;
1383         int rc = 0;
1384
1385         /* take dnode_t from tx to save on dnode#->dnode_t lookup */
1386         for (txh = list_tail(&tx->tx_holds); txh;
1387              txh = list_prev(&tx->tx_holds, txh)) {
1388                 dnode_t *dn = txh->txh_dnode;
1389                 dmu_buf_impl_t *db;
1390
1391                 if (dn == NULL)
1392                         continue;
1393                 if (dn->dn_object != oid)
1394                         continue;
1395                 db = dn->dn_bonus;
1396                 if (db == NULL) {
1397                         rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
1398                         if (dn->dn_bonus == NULL)
1399                                 dbuf_create_bonus(dn);
1400                         rw_exit(&dn->dn_struct_rwlock);
1401                 }
1402                 db = dn->dn_bonus;
1403                 LASSERT(db);
1404                 LASSERT(dn->dn_handle);
1405                 DB_DNODE_ENTER(db);
1406                 if (refcount_add(&db->db_holds, osd_obj_tag) == 1) {
1407                         refcount_add(&dn->dn_holds, tag);
1408                         atomic_inc_32(&dn->dn_dbufs_count);
1409                 }
1410                 *dnp = dn;
1411                 dbuf_read(db, NULL, DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH);
1412                 break;
1413         }
1414
1415         if (unlikely(*dnp == NULL))
1416                 rc = __osd_obj2dnode(tx->tx_objset, oid, dnp);
1417
1418         return rc;
1419 }
1420
1421 #ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
1422 static int osd_find_dnsize(struct osd_object *obj)
1423 {
1424         struct osd_device *osd = osd_obj2dev(obj);
1425         int dnsize;
1426
1427         if (osd->od_dnsize == ZFS_DNSIZE_AUTO) {
1428                 dnsize = DNODE_MIN_SIZE;
1429                 do {
1430                         if (DN_BONUS_SIZE(dnsize) >= obj->oo_ea_in_bonus + 32)
1431                                 break;
1432                         dnsize <<= 1;
1433                 } while (dnsize < DNODE_MAX_SIZE);
1434                 if (dnsize > DNODE_MAX_SIZE)
1435                         dnsize = DNODE_MAX_SIZE;
1436         } else if (osd->od_dnsize == ZFS_DNSIZE_1K) {
1437                 dnsize = 1024;
1438         } else if (osd->od_dnsize == ZFS_DNSIZE_2K) {
1439                 dnsize = 2048;
1440         } else if (osd->od_dnsize == ZFS_DNSIZE_4K) {
1441                 dnsize = 4096;
1442         } else if (osd->od_dnsize == ZFS_DNSIZE_8K) {
1443                 dnsize = 8192;
1444         } else if (osd->od_dnsize == ZFS_DNSIZE_16K) {
1445                 dnsize = 16384;
1446         } else {
1447                 dnsize = DNODE_MIN_SIZE;
1448         }
1449         return dnsize;
1450 }
1451 #else
1452 static int inline osd_find_dnsize(struct osd_object *obj)
1453 {
1454         return DN_MAX_BONUSLEN;
1455 }
1456 #endif
1457
1458 /*
1459  * The transaction passed to this routine must have
1460  * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
1461  * to a transaction group.
1462  */
1463 int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
1464                         dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la)
1465 {
1466         struct osd_device   *osd = osd_obj2dev(obj);
1467         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
1468         dmu_object_type_t    type = DMU_OT_PLAIN_FILE_CONTENTS;
1469         uint64_t oid;
1470
1471         /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks
1472          * would get an additional ditto copy */
1473         if (unlikely(S_ISREG(la->la_mode) &&
1474                      fid_seq_is_local_file(fid_seq(fid))))
1475                 type = DMU_OTN_UINT8_METADATA;
1476
1477         /* Create a new DMU object using the default dnode size. */
1478         oid = osd_dmu_object_alloc(osd->od_os, type, 0,
1479                                    osd_find_dnsize(obj), tx);
1480
1481         LASSERT(la->la_valid & LA_MODE);
1482         la->la_size = 0;
1483         la->la_nlink = 1;
1484
1485         return osd_find_new_dnode(env, tx, oid, dnp);
1486 }
1487
1488 /*
1489  * The transaction passed to this routine must have
1490  * dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, ...) called and then assigned
1491  * to a transaction group.
1492  *
1493  * Using ZAP_FLAG_HASH64 will force the ZAP to always be a FAT ZAP.
1494  * This is fine for directories today, because storing the FID in the dirent
1495  * will also require a FAT ZAP.  If there is a new type of micro ZAP created
1496  * then we might need to re-evaluate the use of this flag and instead do
1497  * a conversion from the different internal ZAP hash formats being used. */
1498 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
1499                      dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la,
1500                      unsigned dnsize, zap_flags_t flags)
1501 {
1502         uint64_t oid;
1503
1504         /* Assert that the transaction has been assigned to a
1505            transaction group. */
1506         LASSERT(tx->tx_txg != 0);
1507         *dnp = NULL;
1508
1509         oid = osd_zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
1510                                    DMU_OT_DIRECTORY_CONTENTS,
1511                                    14, /* == ZFS fzap_default_blockshift */
1512                                    DN_MAX_INDBLKSHIFT, /* indirect blockshift */
1513                                    dnsize, tx);
1514
1515         la->la_size = 2;
1516         la->la_nlink = 1;
1517
1518         return osd_find_new_dnode(env, tx, oid, dnp);
1519 }
1520
1521 static dnode_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj,
1522                           struct lu_attr *la, struct osd_thandle *oh)
1523 {
1524         dnode_t *dn;
1525         int rc;
1526
1527         /* Index file should be created as regular file in order not to confuse
1528          * ZPL which could interpret them as directory.
1529          * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
1530          * binary keys */
1531         LASSERT(S_ISREG(la->la_mode));
1532         rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
1533                               osd_find_dnsize(obj), ZAP_FLAG_UINT64_KEY);
1534         if (rc)
1535                 return ERR_PTR(rc);
1536         return dn;
1537 }
1538
1539 static dnode_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj,
1540                           struct lu_attr *la, struct osd_thandle *oh)
1541 {
1542         dnode_t *dn;
1543         int rc;
1544
1545         LASSERT(S_ISDIR(la->la_mode));
1546         rc = __osd_zap_create(env, osd_obj2dev(obj), &dn, oh->ot_tx, la,
1547                               osd_find_dnsize(obj), 0);
1548         if (rc)
1549                 return ERR_PTR(rc);
1550         return dn;
1551 }
1552
1553 static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
1554                           struct lu_attr *la, struct osd_thandle *oh)
1555 {
1556         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
1557         struct osd_device *osd = osd_obj2dev(obj);
1558         dnode_t *dn;
1559         int rc;
1560
1561         LASSERT(S_ISREG(la->la_mode));
1562         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1563         if (rc)
1564                 return ERR_PTR(rc);
1565
1566         if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) &&
1567             osd->od_is_ost) {
1568                 /* The minimum block size must be at least page size otherwise
1569                  * it will break the assumption in tgt_thread_big_cache where
1570                  * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
1571                  * RDMA due to subpage transfer size */
1572                 rc = -dmu_object_set_blocksize(osd->od_os, dn->dn_object,
1573                                                PAGE_SIZE, 0, oh->ot_tx);
1574                 if (unlikely(rc)) {
1575                         CERROR("%s: can't change blocksize: %d\n",
1576                                osd->od_svname, rc);
1577                         return ERR_PTR(rc);
1578                 }
1579         }
1580
1581         return dn;
1582 }
1583
1584 static dnode_t *osd_mksym(const struct lu_env *env, struct osd_object *obj,
1585                           struct lu_attr *la, struct osd_thandle *oh)
1586 {
1587         dnode_t *dn;
1588         int rc;
1589
1590         LASSERT(S_ISLNK(la->la_mode));
1591         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1592         if (rc)
1593                 return ERR_PTR(rc);
1594         return dn;
1595 }
1596
1597 static dnode_t *osd_mknod(const struct lu_env *env, struct osd_object *obj,
1598                           struct lu_attr *la, struct osd_thandle *oh)
1599 {
1600         dnode_t *dn;
1601         int rc;
1602
1603         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
1604                 la->la_valid |= LA_RDEV;
1605
1606         rc = __osd_object_create(env, obj, &dn, oh->ot_tx, la);
1607         if (rc)
1608                 return ERR_PTR(rc);
1609         return dn;
1610 }
1611
1612 typedef dnode_t *(*osd_obj_type_f)(const struct lu_env *env,
1613                                    struct osd_object *obj,
1614                                    struct lu_attr *la,
1615                                    struct osd_thandle *oh);
1616
1617 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1618 {
1619         osd_obj_type_f result;
1620
1621         switch (type) {
1622         case DFT_DIR:
1623                 result = osd_mkdir;
1624                 break;
1625         case DFT_INDEX:
1626                 result = osd_mkidx;
1627                 break;
1628         case DFT_REGULAR:
1629                 result = osd_mkreg;
1630                 break;
1631         case DFT_SYM:
1632                 result = osd_mksym;
1633                 break;
1634         case DFT_NODE:
1635                 result = osd_mknod;
1636                 break;
1637         default:
1638                 LBUG();
1639                 break;
1640         }
1641         return result;
1642 }
1643
1644 /*
1645  * Concurrency: @dt is write locked.
1646  */
1647 static int osd_create(const struct lu_env *env, struct dt_object *dt,
1648                       struct lu_attr *attr, struct dt_allocation_hint *hint,
1649                       struct dt_object_format *dof, struct thandle *th)
1650 {
1651         struct osd_thread_info  *info = osd_oti_get(env);
1652         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
1653         struct zpl_direntry     *zde = &info->oti_zde.lzd_reg;
1654         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1655         struct osd_object       *obj = osd_dt_obj(dt);
1656         struct osd_device       *osd = osd_obj2dev(obj);
1657         char                    *buf = info->oti_str;
1658         struct osd_thandle      *oh;
1659         dnode_t *dn = NULL, *zdn = NULL;
1660         uint64_t                 zapid, parent = 0;
1661         int                      rc;
1662
1663         ENTRY;
1664
1665         LASSERT(!fid_is_acct(fid));
1666
1667         /* concurrent create declarations should not see
1668          * the object inconsistent (db, attr, etc).
1669          * in regular cases acquisition should be cheap */
1670         down_write(&obj->oo_guard);
1671
1672         if (unlikely(dt_object_exists(dt)))
1673                 GOTO(out, rc = -EEXIST);
1674
1675         LASSERT(osd_invariant(obj));
1676         LASSERT(dof != NULL);
1677
1678         LASSERT(th != NULL);
1679         oh = container_of0(th, struct osd_thandle, ot_super);
1680
1681         LASSERT(obj->oo_dn == NULL);
1682
1683         /* to follow ZFS on-disk format we need
1684          * to initialize parent dnode properly */
1685         if (hint != NULL && hint->dah_parent != NULL &&
1686             !dt_object_remote(hint->dah_parent))
1687                 parent = osd_dt_obj(hint->dah_parent)->oo_dn->dn_object;
1688
1689         /* we may fix some attributes, better do not change the source */
1690         obj->oo_attr = *attr;
1691         obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE;
1692
1693 #ifdef ZFS_PROJINHERIT
1694         if (osd->od_projectused_dn) {
1695                 if (!(obj->oo_attr.la_valid & LA_PROJID))
1696                         obj->oo_attr.la_projid = ZFS_DEFAULT_PROJID;
1697                 obj->oo_with_projid = 1;
1698         }
1699 #endif
1700
1701         dn = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh);
1702         if (IS_ERR(dn)) {
1703                 rc = PTR_ERR(dn);
1704                 dn = NULL;
1705                 GOTO(out, rc);
1706         }
1707
1708         zde->zde_pad = 0;
1709         zde->zde_dnode = dn->dn_object;
1710         zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
1711
1712         zapid = osd_get_name_n_idx(env, osd, fid, buf,
1713                                    sizeof(info->oti_str), &zdn);
1714         rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1, zde, oh->ot_tx);
1715         if (rc)
1716                 GOTO(out, rc);
1717         obj->oo_dn = dn;
1718         /* Now add in all of the "SA" attributes */
1719         rc = osd_sa_handle_get(obj);
1720         if (rc)
1721                 GOTO(out, rc);
1722
1723         rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
1724         if (rc)
1725                 GOTO(out, rc);
1726
1727         /* initialize LMA */
1728         lustre_lma_init(lma, fid, 0, 0);
1729         lustre_lma_swab(lma);
1730         rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA,
1731                                     (uchar_t *)lma, sizeof(*lma));
1732         if (rc)
1733                 GOTO(out, rc);
1734
1735         /* configure new osd object */
1736         obj->oo_parent = parent != 0 ? parent : zapid;
1737         obj->oo_late_attr_set = 1;
1738         rc = __osd_sa_xattr_schedule_update(env, obj, oh);
1739         if (rc)
1740                 GOTO(out, rc);
1741
1742         /* XXX: oo_lma_flags */
1743         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
1744         if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu))))
1745                 /* no body operations for accounting objects */
1746                 obj->oo_dt.do_body_ops = &osd_body_ops;
1747
1748         osd_idc_find_and_init(env, osd, obj);
1749
1750 out:
1751         if (unlikely(rc && dn)) {
1752                 dmu_object_free(osd->od_os, dn->dn_object, oh->ot_tx);
1753                 osd_dnode_rele(dn);
1754                 obj->oo_dn = NULL;
1755         } else if (!rc) {
1756                 obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
1757         }
1758         up_write(&obj->oo_guard);
1759         RETURN(rc);
1760 }
1761
1762 static int osd_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
1763                                struct thandle *th)
1764 {
1765         osd_idc_find_and_init(env, osd_dev(dt->do_lu.lo_dev), osd_dt_obj(dt));
1766         return osd_declare_attr_set(env, dt, NULL, th);
1767 }
1768
1769 /*
1770  * Concurrency: @dt is write locked.
1771  */
1772 static int osd_ref_add(const struct lu_env *env, struct dt_object *dt,
1773                        struct thandle *handle)
1774 {
1775         struct osd_object       *obj = osd_dt_obj(dt);
1776         struct osd_thandle      *oh;
1777         struct osd_device       *osd = osd_obj2dev(obj);
1778         uint64_t                 nlink;
1779         int rc;
1780
1781         ENTRY;
1782
1783         down_read(&obj->oo_guard);
1784         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1785                 GOTO(out, rc = -ENOENT);
1786
1787         LASSERT(osd_invariant(obj));
1788         LASSERT(obj->oo_sa_hdl != NULL);
1789
1790         oh = container_of0(handle, struct osd_thandle, ot_super);
1791
1792         write_lock(&obj->oo_attr_lock);
1793         nlink = ++obj->oo_attr.la_nlink;
1794         write_unlock(&obj->oo_attr_lock);
1795
1796         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1797
1798 out:
1799         up_read(&obj->oo_guard);
1800         RETURN(rc);
1801 }
1802
1803 static int osd_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
1804                                struct thandle *handle)
1805 {
1806         osd_idc_find_and_init(env, osd_dev(dt->do_lu.lo_dev), osd_dt_obj(dt));
1807         return osd_declare_attr_set(env, dt, NULL, handle);
1808 }
1809
1810 /*
1811  * Concurrency: @dt is write locked.
1812  */
1813 static int osd_ref_del(const struct lu_env *env, struct dt_object *dt,
1814                        struct thandle *handle)
1815 {
1816         struct osd_object       *obj = osd_dt_obj(dt);
1817         struct osd_thandle      *oh;
1818         struct osd_device       *osd = osd_obj2dev(obj);
1819         uint64_t                 nlink;
1820         int                      rc;
1821
1822         ENTRY;
1823
1824         down_read(&obj->oo_guard);
1825
1826         if (unlikely(!dt_object_exists(dt) || obj->oo_destroyed))
1827                 GOTO(out, rc = -ENOENT);
1828
1829         LASSERT(osd_invariant(obj));
1830         LASSERT(obj->oo_sa_hdl != NULL);
1831
1832         oh = container_of0(handle, struct osd_thandle, ot_super);
1833         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1834
1835         write_lock(&obj->oo_attr_lock);
1836         nlink = --obj->oo_attr.la_nlink;
1837         write_unlock(&obj->oo_attr_lock);
1838
1839         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1840
1841 out:
1842         up_read(&obj->oo_guard);
1843         RETURN(rc);
1844 }
1845
1846 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
1847                            __u64 start, __u64 end)
1848 {
1849         struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
1850         ENTRY;
1851
1852         /* XXX: no other option than syncing the whole filesystem until we
1853          * support ZIL.  If the object tracked the txg that it was last
1854          * modified in, it could pass that txg here instead of "0".  Maybe
1855          * the changes are already committed, so no wait is needed at all? */
1856         if (!osd->od_dt_dev.dd_rdonly) {
1857                 if (osd_object_sync_delay_us < 0)
1858                         txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
1859                 else
1860                         udelay(osd_object_sync_delay_us);
1861         }
1862
1863         RETURN(0);
1864 }
1865
1866 static int osd_invalidate(const struct lu_env *env, struct dt_object *dt)
1867 {
1868         return 0;
1869 }
1870
1871 static struct dt_object_operations osd_obj_ops = {
1872         .do_read_lock           = osd_read_lock,
1873         .do_write_lock          = osd_write_lock,
1874         .do_read_unlock         = osd_read_unlock,
1875         .do_write_unlock        = osd_write_unlock,
1876         .do_write_locked        = osd_write_locked,
1877         .do_attr_get            = osd_attr_get,
1878         .do_declare_attr_set    = osd_declare_attr_set,
1879         .do_attr_set            = osd_attr_set,
1880         .do_ah_init             = osd_ah_init,
1881         .do_declare_create      = osd_declare_create,
1882         .do_create              = osd_create,
1883         .do_declare_destroy     = osd_declare_destroy,
1884         .do_destroy             = osd_destroy,
1885         .do_index_try           = osd_index_try,
1886         .do_declare_ref_add     = osd_declare_ref_add,
1887         .do_ref_add             = osd_ref_add,
1888         .do_declare_ref_del     = osd_declare_ref_del,
1889         .do_ref_del             = osd_ref_del,
1890         .do_xattr_get           = osd_xattr_get,
1891         .do_declare_xattr_set   = osd_declare_xattr_set,
1892         .do_xattr_set           = osd_xattr_set,
1893         .do_declare_xattr_del   = osd_declare_xattr_del,
1894         .do_xattr_del           = osd_xattr_del,
1895         .do_xattr_list          = osd_xattr_list,
1896         .do_object_sync         = osd_object_sync,
1897         .do_invalidate          = osd_invalidate,
1898 };
1899
1900 static struct lu_object_operations osd_lu_obj_ops = {
1901         .loo_object_init        = osd_object_init,
1902         .loo_object_delete      = osd_object_delete,
1903         .loo_object_release     = osd_object_release,
1904         .loo_object_free        = osd_object_free,
1905         .loo_object_print       = osd_object_print,
1906         .loo_object_invariant   = osd_object_invariant,
1907 };
1908
1909 static int osd_otable_it_attr_get(const struct lu_env *env,
1910                                 struct dt_object *dt,
1911                                 struct lu_attr *attr)
1912 {
1913         attr->la_valid = 0;
1914         return 0;
1915 }
1916
1917 static struct dt_object_operations osd_obj_otable_it_ops = {
1918         .do_attr_get            = osd_otable_it_attr_get,
1919         .do_index_try           = osd_index_try,
1920 };
1921
1922 module_param(osd_object_sync_delay_us, int, 0644);
1923 MODULE_PARM_DESC(osd_object_sync_delay_us,
1924                  "If zero or larger delay N usec instead of doing object sync");