Whamcloud - gitweb
LU-1305 osd: dmu helpers
[fs/lustre-release.git] / lustre / osd-zfs / osd_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2011, 2012 Whamcloud, Inc.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_object.c
39  *
40  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41  * Author: Mike Pershin <tappro@whamcloud.com>
42  * Author: Johann Lombardi <johann@whamcloud.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_OSD
49
50 #include <lustre_ver.h>
51 #include <libcfs/libcfs.h>
52 #include <lustre_fsfilt.h>
53 #include <obd_support.h>
54 #include <lustre_net.h>
55 #include <obd.h>
56 #include <obd_class.h>
57 #include <lustre_disk.h>
58 #include <lustre_fid.h>
59
60 #include "osd_internal.h"
61
62 #include <sys/dnode.h>
63 #include <sys/dbuf.h>
64 #include <sys/spa.h>
65 #include <sys/stat.h>
66 #include <sys/zap.h>
67 #include <sys/spa_impl.h>
68 #include <sys/zfs_znode.h>
69 #include <sys/dmu_tx.h>
70 #include <sys/dmu_objset.h>
71 #include <sys/dsl_prop.h>
72 #include <sys/sa_impl.h>
73 #include <sys/txg.h>
74
75 static char *osd_obj_tag = "osd_object";
76
77 static struct dt_object_operations osd_obj_ops;
78 static struct lu_object_operations osd_lu_obj_ops;
79 extern struct dt_body_operations osd_body_ops;
80
81 extern cfs_mem_cache_t *osd_object_kmem;
82
83 static void
84 osd_object_sa_fini(struct osd_object *obj)
85 {
86         if (obj->oo_sa_hdl) {
87                 sa_handle_destroy(obj->oo_sa_hdl);
88                 obj->oo_sa_hdl = NULL;
89         }
90 }
91
92 static int
93 osd_object_sa_init(struct osd_object *obj, udmu_objset_t *uos)
94 {
95         int rc;
96
97         LASSERT(obj->oo_sa_hdl == NULL);
98         LASSERT(obj->oo_db != NULL);
99
100         rc = -sa_handle_get(uos->os, obj->oo_db->db_object, obj,
101                             SA_HDL_PRIVATE, &obj->oo_sa_hdl);
102         if (rc)
103                 return rc;
104
105         /* Cache the xattr object id, valid for the life of the object */
106         rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_XATTR(uos), &obj->oo_xattr, 8);
107         if (rc == -ENOENT) {
108                 obj->oo_xattr = ZFS_NO_OBJECT;
109                 rc = 0;
110         } else if (rc) {
111                 osd_object_sa_fini(obj);
112         }
113
114         return rc;
115 }
116
117 /*
118  * Add object to list of dirty objects in tx handle.
119  */
120 static void
121 osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
122 {
123         if (!cfs_list_empty(&obj->oo_sa_linkage))
124                 return;
125
126         cfs_down(&oh->ot_sa_lock);
127         if (likely(cfs_list_empty(&obj->oo_sa_linkage)))
128                 cfs_list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
129         cfs_up(&oh->ot_sa_lock);
130 }
131
132 /*
133  * Release spill block dbuf hold for all dirty SAs.
134  */
135 void osd_object_sa_dirty_rele(struct osd_thandle *oh)
136 {
137         struct osd_object *obj;
138
139         cfs_down(&oh->ot_sa_lock);
140         while (!cfs_list_empty(&oh->ot_sa_list)) {
141                 obj = cfs_list_entry(oh->ot_sa_list.next,
142                                      struct osd_object, oo_sa_linkage);
143                 sa_spill_rele(obj->oo_sa_hdl);
144                 cfs_list_del_init(&obj->oo_sa_linkage);
145         }
146         cfs_up(&oh->ot_sa_lock);
147 }
148
149 /*
150  * Update the SA and add the object to the dirty list.
151  */
152 int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
153                          void *buf, uint32_t buflen, struct osd_thandle *oh)
154 {
155         int rc;
156
157         LASSERT(obj->oo_sa_hdl != NULL);
158         LASSERT(oh->ot_tx != NULL);
159
160         rc = -sa_update(obj->oo_sa_hdl, type, buf, buflen, oh->ot_tx);
161         osd_object_sa_dirty_add(obj, oh);
162
163         return rc;
164 }
165
166 /*
167  * Bulk update the SA and add the object to the dirty list.
168  */
169 static int
170 osd_object_sa_bulk_update(struct osd_object *obj, sa_bulk_attr_t *attrs,
171                           int count, struct osd_thandle *oh)
172 {
173         int rc;
174
175         LASSERT(obj->oo_sa_hdl != NULL);
176         LASSERT(oh->ot_tx != NULL);
177
178         rc = -sa_bulk_update(obj->oo_sa_hdl, attrs, count, oh->ot_tx);
179         osd_object_sa_dirty_add(obj, oh);
180
181         return rc;
182 }
183
184 /*
185  * Retrieve the attributes of a DMU object
186  */
187 int __osd_object_attr_get(const struct lu_env *env, udmu_objset_t *uos,
188                           struct osd_object *obj, struct lu_attr *la)
189 {
190         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
191         sa_handle_t     *sa_hdl;
192         sa_bulk_attr_t  *bulk;
193         int              cnt = 0;
194         int              rc;
195         ENTRY;
196
197         LASSERT(obj->oo_db != NULL);
198
199         rc = -sa_handle_get(uos->os, obj->oo_db->db_object, NULL,
200                             SA_HDL_PRIVATE, &sa_hdl);
201         if (rc)
202                 RETURN(rc);
203
204         OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 9);
205         if (bulk == NULL)
206                 GOTO(out_sa, rc = -ENOMEM);
207
208         la->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | LA_TYPE |
209                         LA_SIZE | LA_UID | LA_GID | LA_FLAGS | LA_NLINK;
210
211         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(uos), NULL, osa->atime, 16);
212         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(uos), NULL, osa->mtime, 16);
213         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(uos), NULL, osa->ctime, 16);
214         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(uos), NULL, &osa->mode, 8);
215         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(uos), NULL, &osa->size, 8);
216         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(uos), NULL, &osa->nlink, 8);
217         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(uos), NULL, &osa->uid, 8);
218         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(uos), NULL, &osa->gid, 8);
219         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(uos), NULL, &osa->flags, 8);
220
221         rc = -sa_bulk_lookup(sa_hdl, bulk, cnt);
222         if (rc)
223                 GOTO(out_bulk, rc);
224
225         la->la_atime = osa->atime[0];
226         la->la_mtime = osa->mtime[0];
227         la->la_ctime = osa->ctime[0];
228         la->la_mode = osa->mode;
229         la->la_uid = osa->uid;
230         la->la_gid = osa->gid;
231         la->la_nlink = osa->nlink;
232         la->la_flags = osa->flags;
233         la->la_size = osa->size;
234
235         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
236                 rc = -sa_lookup(sa_hdl, SA_ZPL_RDEV(uos), &osa->rdev, 8);
237                 if (rc)
238                         GOTO(out_bulk, rc);
239                 la->la_rdev = osa->rdev;
240                 la->la_valid |= LA_RDEV;
241         }
242 out_bulk:
243         OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 9);
244 out_sa:
245         sa_handle_destroy(sa_hdl);
246
247         RETURN(rc);
248 }
249
250 int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
251                    uint64_t oid, dmu_buf_t **dbp, void *tag)
252 {
253         dmu_object_info_t *doi = &osd_oti_get(env)->oti_doi;
254         int rc;
255
256         LASSERT(tag);
257
258         rc = -sa_buf_hold(os, oid, tag, dbp);
259         if (rc)
260                 return rc;
261
262         dmu_object_info_from_db(*dbp, doi);
263         if (unlikely (oid != DMU_USERUSED_OBJECT &&
264             oid != DMU_GROUPUSED_OBJECT && doi->doi_bonus_type != DMU_OT_SA)) {
265                 sa_buf_rele(*dbp, tag);
266                 *dbp = NULL;
267                 return -EINVAL;
268         }
269
270         LASSERT(*dbp);
271         LASSERT((*dbp)->db_object == oid);
272         LASSERT((*dbp)->db_offset == -1);
273         LASSERT((*dbp)->db_data != NULL);
274
275         return 0;
276 }
277
278 /*
279  * Concurrency: no concurrent access is possible that early in object
280  * life-cycle.
281  */
282 struct lu_object *osd_object_alloc(const struct lu_env *env,
283                                    const struct lu_object_header *hdr,
284                                    struct lu_device *d)
285 {
286         struct osd_object *mo;
287
288         OBD_SLAB_ALLOC_PTR_GFP(mo, osd_object_kmem, CFS_ALLOC_IO);
289         if (mo != NULL) {
290                 struct lu_object *l;
291
292                 l = &mo->oo_dt.do_lu;
293                 dt_object_init(&mo->oo_dt, NULL, d);
294                 mo->oo_dt.do_ops = &osd_obj_ops;
295                 l->lo_ops = &osd_lu_obj_ops;
296                 CFS_INIT_LIST_HEAD(&mo->oo_sa_linkage);
297                 cfs_init_rwsem(&mo->oo_sem);
298                 cfs_sema_init(&mo->oo_guard, 1);
299                 cfs_rwlock_init(&mo->oo_attr_lock);
300                 return l;
301         } else {
302                 return NULL;
303         }
304 }
305
306 /*
307  * Concurrency: shouldn't matter.
308  */
309 int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
310 {
311         struct osd_device       *osd = osd_obj2dev(obj);
312         udmu_objset_t           *uos = &osd->od_objset;
313         const struct lu_fid     *fid  = lu_object_fid(&obj->oo_dt.do_lu);
314         int                      rc = 0;
315         ENTRY;
316
317         if (obj->oo_db == NULL)
318                 RETURN(0);
319
320         /* object exist */
321
322         rc = osd_object_sa_init(obj, uos);
323         if (rc)
324                 RETURN(rc);
325
326         /* cache attrs in object */
327         rc = __osd_object_attr_get(env, &osd->od_objset,
328                                    obj, &obj->oo_attr);
329         if (rc)
330                 RETURN(rc);
331
332         if (likely(!fid_is_acct(fid)))
333                 /* no body operations for accounting objects */
334                 obj->oo_dt.do_body_ops = &osd_body_ops;
335
336         /*
337          * initialize object before marking it existing
338          */
339         obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
340
341         cfs_mb();
342         obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
343
344         RETURN(0);
345 }
346
347 /*
348  * Concurrency: no concurrent access is possible that early in object
349  * life-cycle.
350  */
351 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
352                            const struct lu_object_conf *conf)
353 {
354         struct osd_object       *obj = osd_obj(l);
355         struct osd_device       *osd = osd_obj2dev(obj);
356         uint64_t                 oid;
357         int                      rc;
358         ENTRY;
359
360         LASSERT(osd_invariant(obj));
361
362         rc = osd_fid_lookup(env, osd, lu_object_fid(l), &oid);
363         if (rc == 0) {
364                 LASSERT(obj->oo_db == NULL);
365                 rc = __osd_obj2dbuf(env, osd->od_objset.os, oid,
366                                         &obj->oo_db, osd_obj_tag);
367                 if (rc == 0) {
368                         LASSERT(obj->oo_db);
369                         rc = osd_object_init0(env, obj);
370                 } else {
371                         CERROR("%s: lookup "DFID"/"LPX64" failed: rc = %d\n",
372                                osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
373                 }
374         } else if (rc == -ENOENT) {
375                 rc = 0;
376         }
377         LASSERT(osd_invariant(obj));
378         RETURN(rc);
379 }
380
381 /*
382  * Concurrency: no concurrent access is possible that late in object
383  * life-cycle.
384  */
385 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
386 {
387         struct osd_object *obj = osd_obj(l);
388
389         LASSERT(osd_invariant(obj));
390
391         dt_object_fini(&obj->oo_dt);
392         OBD_SLAB_FREE_PTR(obj, osd_object_kmem);
393 }
394
395 static void __osd_declare_object_destroy(const struct lu_env *env,
396                                          struct osd_object *obj,
397                                          struct osd_thandle *oh)
398 {
399         struct osd_device       *osd = osd_obj2dev(obj);
400         udmu_objset_t           *uos = &osd->od_objset;
401         dmu_buf_t               *db = obj->oo_db;
402         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
403         uint64_t                 oid = db->db_object, xid;
404         dmu_tx_t                *tx = oh->ot_tx;
405         zap_cursor_t            *zc;
406         int                      rc = 0;
407
408         dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
409
410         /* zap holding xattrs */
411         if (obj->oo_xattr != ZFS_NO_OBJECT) {
412                 oid = obj->oo_xattr;
413
414                 dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
415
416                 rc = -udmu_zap_cursor_init(&zc, uos, oid, 0);
417                 if (rc)
418                         goto out;
419
420                 while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
421                         BUG_ON(za->za_integer_length != sizeof(uint64_t));
422                         BUG_ON(za->za_num_integers != 1);
423
424                         rc = -zap_lookup(uos->os, obj->oo_xattr, za->za_name,
425                                          sizeof(uint64_t), 1, &xid);
426                         if (rc) {
427                                 CERROR("%s: xattr lookup failed: rc = %d\n",
428                                        osd->od_svname, rc);
429                                 goto out_err;
430                         }
431                         dmu_tx_hold_free(tx, xid, 0, DMU_OBJECT_END);
432
433                         zap_cursor_advance(zc);
434                 }
435                 if (rc == -ENOENT)
436                         rc = 0;
437 out_err:
438                 udmu_zap_cursor_fini(zc);
439         }
440 out:
441         if (rc && tx->tx_err == 0)
442                 tx->tx_err = -rc;
443 }
444
445 static int osd_declare_object_destroy(const struct lu_env *env,
446                                       struct dt_object *dt,
447                                       struct thandle *th)
448 {
449         char                    *buf = osd_oti_get(env)->oti_str;
450         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
451         struct osd_object       *obj = osd_dt_obj(dt);
452         struct osd_device       *osd = osd_obj2dev(obj);
453         struct osd_thandle      *oh;
454         uint64_t                 zapid;
455         ENTRY;
456
457         LASSERT(th != NULL);
458         LASSERT(dt_object_exists(dt));
459
460         oh = container_of0(th, struct osd_thandle, ot_super);
461         LASSERT(oh->ot_tx != NULL);
462
463         /* declare that we'll destroy the object */
464         __osd_declare_object_destroy(env, obj, oh);
465
466         /* declare that we'll remove object from fid-dnode mapping */
467         zapid = osd_get_name_n_idx(env, osd, fid, buf);
468         dmu_tx_hold_bonus(oh->ot_tx, zapid);
469         dmu_tx_hold_zap(oh->ot_tx, zapid, 0, buf);
470
471         /* declare that we'll remove object from inode accounting ZAPs */
472         dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
473         dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, 0, buf);
474         dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
475         dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf);
476
477         RETURN(0);
478 }
479
480 int __osd_object_free(udmu_objset_t *uos, uint64_t oid, dmu_tx_t *tx)
481 {
482         LASSERT(uos->objects != 0);
483         cfs_spin_lock(&uos->lock);
484         uos->objects--;
485         cfs_spin_unlock(&uos->lock);
486
487         return -dmu_object_free(uos->os, oid, tx);
488 }
489
490 /*
491  * Delete a DMU object
492  *
493  * The transaction passed to this routine must have
494  * dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END) called
495  * and then assigned to a transaction group.
496  *
497  * This will release db and set it to NULL to prevent further dbuf releases.
498  */
499 static int __osd_object_destroy(const struct lu_env *env,
500                                 struct osd_object *obj,
501                                 dmu_tx_t *tx, void *tag)
502 {
503         struct osd_device       *osd = osd_obj2dev(obj);
504         udmu_objset_t           *uos = &osd->od_objset;
505         uint64_t                 xid;
506         zap_attribute_t         *za = &osd_oti_get(env)->oti_za;
507         zap_cursor_t            *zc;
508         int                      rc;
509
510         /* Assert that the transaction has been assigned to a
511            transaction group. */
512         LASSERT(tx->tx_txg != 0);
513
514         /* zap holding xattrs */
515         if (obj->oo_xattr != ZFS_NO_OBJECT) {
516                 rc = -udmu_zap_cursor_init(&zc, uos, obj->oo_xattr, 0);
517                 if (rc)
518                         return rc;
519                 while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
520                         BUG_ON(za->za_integer_length != sizeof(uint64_t));
521                         BUG_ON(za->za_num_integers != 1);
522
523                         rc = -zap_lookup(uos->os, obj->oo_xattr, za->za_name,
524                                          sizeof(uint64_t), 1, &xid);
525                         if (rc) {
526                                 CERROR("%s: lookup xattr %s failed: rc = %d\n",
527                                        osd->od_svname, za->za_name, rc);
528                                 continue;
529                         }
530                         rc = __osd_object_free(uos, xid, tx);
531                         if (rc)
532                                 CERROR("%s: fetch xattr %s failed: rc = %d\n",
533                                        osd->od_svname, za->za_name, rc);
534                         zap_cursor_advance(zc);
535                 }
536                 udmu_zap_cursor_fini(zc);
537
538                 rc = __osd_object_free(uos, obj->oo_xattr, tx);
539                 if (rc)
540                         CERROR("%s: freeing xattr failed: rc = %d\n",
541                                osd->od_svname, rc);
542         }
543
544         return __osd_object_free(uos, obj->oo_db->db_object, tx);
545 }
546
547 static int osd_object_destroy(const struct lu_env *env,
548                               struct dt_object *dt,
549                               struct thandle *th)
550 {
551         char                    *buf = osd_oti_get(env)->oti_str;
552         struct osd_object       *obj = osd_dt_obj(dt);
553         struct osd_device       *osd = osd_obj2dev(obj);
554         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
555         struct osd_thandle      *oh;
556         int                      rc;
557         uint64_t                 zapid;
558         ENTRY;
559
560         LASSERT(obj->oo_db != NULL);
561         LASSERT(dt_object_exists(dt));
562         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
563
564         oh = container_of0(th, struct osd_thandle, ot_super);
565         LASSERT(oh != NULL);
566         LASSERT(oh->ot_tx != NULL);
567
568         zapid = osd_get_name_n_idx(env, osd, fid, buf);
569
570         /* remove obj ref from index dir (it depends) */
571         rc = -zap_remove(osd->od_objset.os, zapid, buf, oh->ot_tx);
572         if (rc) {
573                 CERROR("%s: zap_remove() failed: rc = %d\n",
574                        osd->od_svname, rc);
575                 GOTO(out, rc);
576         }
577
578         /* Remove object from inode accounting. It is not fatal for the destroy
579          * operation if something goes wrong while updating accounting, but we
580          * still log an error message to notify the administrator */
581         rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
582                         obj->oo_attr.la_uid, -1, oh->ot_tx);
583         if (rc)
584                 CERROR("%s: failed to remove "DFID" from accounting ZAP for usr"
585                         " %d: rc = %d\n", osd->od_svname, PFID(fid),
586                         obj->oo_attr.la_uid, rc);
587         rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
588                                 obj->oo_attr.la_gid, -1, oh->ot_tx);
589         if (rc)
590                 CERROR("%s: failed to remove "DFID" from accounting ZAP for grp"
591                         " %d: rc = %d\n", osd->od_svname, PFID(fid),
592                         obj->oo_attr.la_gid, rc);
593
594         /* kill object */
595         rc = __osd_object_destroy(env, obj, oh->ot_tx, osd_obj_tag);
596         if (rc) {
597                 CERROR("%s: __osd_object_destroy() failed: rc = %d\n",
598                        osd->od_svname, rc);
599                 GOTO(out, rc);
600         }
601
602 out:
603         /* not needed in the cache anymore */
604         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
605
606         RETURN (0);
607 }
608
609 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
610 {
611         struct osd_object *obj = osd_obj(l);
612
613         if (obj->oo_db != NULL) {
614                 osd_object_sa_fini(obj);
615                 if (obj->oo_sa_xattr) {
616                         nvlist_free(obj->oo_sa_xattr);
617                         obj->oo_sa_xattr = NULL;
618                 }
619                 sa_buf_rele(obj->oo_db, osd_obj_tag);
620                 cfs_list_del(&obj->oo_sa_linkage);
621                 obj->oo_db = NULL;
622         }
623 }
624
625 /*
626  * Concurrency: ->loo_object_release() is called under site spin-lock.
627  */
628 static void osd_object_release(const struct lu_env *env,
629                                struct lu_object *l)
630 {
631 }
632
633 /*
634  * Concurrency: shouldn't matter.
635  */
636 static int osd_object_print(const struct lu_env *env, void *cookie,
637                             lu_printer_t p, const struct lu_object *l)
638 {
639         struct osd_object *o = osd_obj(l);
640
641         return (*p)(env, cookie, LUSTRE_OSD_ZFS_NAME"-object@%p", o);
642 }
643
644 static void osd_object_read_lock(const struct lu_env *env,
645                                  struct dt_object *dt, unsigned role)
646 {
647         struct osd_object *obj = osd_dt_obj(dt);
648
649         LASSERT(osd_invariant(obj));
650
651         cfs_down_read(&obj->oo_sem);
652 }
653
654 static void osd_object_write_lock(const struct lu_env *env,
655                                   struct dt_object *dt, unsigned role)
656 {
657         struct osd_object *obj = osd_dt_obj(dt);
658
659         LASSERT(osd_invariant(obj));
660
661         cfs_down_write(&obj->oo_sem);
662 }
663
664 static void osd_object_read_unlock(const struct lu_env *env,
665                                    struct dt_object *dt)
666 {
667         struct osd_object *obj = osd_dt_obj(dt);
668
669         LASSERT(osd_invariant(obj));
670         cfs_up_read(&obj->oo_sem);
671 }
672
673 static void osd_object_write_unlock(const struct lu_env *env,
674                                     struct dt_object *dt)
675 {
676         struct osd_object *obj = osd_dt_obj(dt);
677
678         LASSERT(osd_invariant(obj));
679         cfs_up_write(&obj->oo_sem);
680 }
681
682 static int osd_object_write_locked(const struct lu_env *env,
683                                    struct dt_object *dt)
684 {
685         struct osd_object *obj = osd_dt_obj(dt);
686         int rc = 1;
687
688         LASSERT(osd_invariant(obj));
689
690         if (cfs_down_write_trylock(&obj->oo_sem)) {
691                 rc = 0;
692                 cfs_up_write(&obj->oo_sem);
693         }
694         return rc;
695 }
696
697 static int osd_attr_get(const struct lu_env *env,
698                         struct dt_object *dt,
699                         struct lu_attr *attr,
700                         struct lustre_capa *capa)
701 {
702         struct osd_object       *obj = osd_dt_obj(dt);
703         uint64_t                 blocks;
704         uint32_t                 blksize;
705
706         LASSERT(dt_object_exists(dt));
707         LASSERT(osd_invariant(obj));
708         LASSERT(obj->oo_db);
709
710         cfs_read_lock(&obj->oo_attr_lock);
711         *attr = obj->oo_attr;
712         cfs_read_unlock(&obj->oo_attr_lock);
713
714         /* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
715          * from within sa_object_size() can block on a mutex, so
716          * we can't call sa_object_size() holding rwlock */
717         sa_object_size(obj->oo_sa_hdl, &blksize, &blocks);
718         /* Block size may be not set; suggest maximal I/O transfers. */
719         if (blksize == 0)
720                 blksize = 1ULL << SPA_MAXBLOCKSHIFT;
721
722         attr->la_blksize = blksize;
723         attr->la_blocks = blocks;
724         attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
725
726         return 0;
727 }
728
729 static int osd_declare_attr_set(const struct lu_env *env,
730                                 struct dt_object *dt,
731                                 const struct lu_attr *attr,
732                                 struct thandle *handle)
733 {
734         char                    *buf = osd_oti_get(env)->oti_str;
735         struct osd_object       *obj = osd_dt_obj(dt);
736         struct osd_device       *osd = osd_obj2dev(obj);
737         struct osd_thandle      *oh;
738         ENTRY;
739
740         if (!dt_object_exists(dt)) {
741                 /* XXX: sanity check that object creation is declared */
742                 RETURN(0);
743         }
744
745         LASSERT(handle != NULL);
746         LASSERT(osd_invariant(obj));
747
748         oh = container_of0(handle, struct osd_thandle, ot_super);
749
750         LASSERT(obj->oo_sa_hdl != NULL);
751         dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
752
753         if (attr && attr->la_valid & LA_UID) {
754                 /* account for user inode tracking ZAP update */
755                 dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
756                 dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
757         }
758         if (attr && attr->la_valid & LA_GID) {
759                 /* account for user inode tracking ZAP update */
760                 dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
761                 dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
762         }
763
764         RETURN(0);
765 }
766
767 /*
768  * Set the attributes of an object
769  *
770  * The transaction passed to this routine must have
771  * dmu_tx_hold_bonus(tx, oid) called and then assigned
772  * to a transaction group.
773  */
774 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
775                         const struct lu_attr *la, struct thandle *handle,
776                         struct lustre_capa *capa)
777 {
778         struct osd_object       *obj = osd_dt_obj(dt);
779         struct osd_device       *osd = osd_obj2dev(obj);
780         udmu_objset_t           *uos = &osd->od_objset;
781         struct osd_thandle      *oh;
782         struct osa_attr         *osa = &osd_oti_get(env)->oti_osa;
783         sa_bulk_attr_t          *bulk;
784         int                      cnt;
785         int                      rc = 0;
786
787         ENTRY;
788         LASSERT(handle != NULL);
789         LASSERT(dt_object_exists(dt));
790         LASSERT(osd_invariant(obj));
791         LASSERT(obj->oo_sa_hdl);
792
793         oh = container_of0(handle, struct osd_thandle, ot_super);
794         /* Assert that the transaction has been assigned to a
795            transaction group. */
796         LASSERT(oh->ot_tx->tx_txg != 0);
797
798         if (la->la_valid == 0)
799                 RETURN(0);
800
801         OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
802         if (bulk == NULL)
803                 RETURN(-ENOMEM);
804
805         /* do both accounting updates outside oo_attr_lock below */
806         if ((la->la_valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
807                 /* Update user accounting. Failure isn't fatal, but we still
808                  * log an error message */
809                 rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
810                                         la->la_uid, 1, oh->ot_tx);
811                 if (rc)
812                         CERROR("%s: failed to update accounting ZAP for user "
813                                 "%d (%d)\n", osd->od_svname, la->la_uid, rc);
814                 rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
815                                         obj->oo_attr.la_uid, -1, oh->ot_tx);
816                 if (rc)
817                         CERROR("%s: failed to update accounting ZAP for user "
818                                 "%d (%d)\n", osd->od_svname,
819                                 obj->oo_attr.la_uid, rc);
820         }
821         if ((la->la_valid & LA_GID) && (la->la_gid != obj->oo_attr.la_gid)) {
822                 /* Update group accounting. Failure isn't fatal, but we still
823                  * log an error message */
824                 rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
825                                         la->la_gid, 1, oh->ot_tx);
826                 if (rc)
827                         CERROR("%s: failed to update accounting ZAP for user "
828                                 "%d (%d)\n", osd->od_svname, la->la_gid, rc);
829                 rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
830                                         obj->oo_attr.la_gid, -1, oh->ot_tx);
831                 if (rc)
832                         CERROR("%s: failed to update accounting ZAP for user "
833                                 "%d (%d)\n", osd->od_svname,
834                                 obj->oo_attr.la_gid, rc);
835         }
836
837         cfs_write_lock(&obj->oo_attr_lock);
838         cnt = 0;
839         if (la->la_valid & LA_ATIME) {
840                 osa->atime[0] = obj->oo_attr.la_atime = la->la_atime;
841                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(uos), NULL,
842                                  osa->atime, 16);
843         }
844         if (la->la_valid & LA_MTIME) {
845                 osa->mtime[0] = obj->oo_attr.la_mtime = la->la_mtime;
846                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(uos), NULL,
847                                  osa->mtime, 16);
848         }
849         if (la->la_valid & LA_CTIME) {
850                 osa->ctime[0] = obj->oo_attr.la_ctime = la->la_ctime;
851                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(uos), NULL,
852                                  osa->ctime, 16);
853         }
854         if (la->la_valid & LA_MODE) {
855                 /* mode is stored along with type, so read it first */
856                 obj->oo_attr.la_mode = (obj->oo_attr.la_mode & S_IFMT) |
857                         (la->la_mode & ~S_IFMT);
858                 osa->mode = obj->oo_attr.la_mode;
859                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(uos), NULL,
860                                  &osa->mode, 8);
861         }
862         if (la->la_valid & LA_SIZE) {
863                 osa->size = obj->oo_attr.la_size = la->la_size;
864                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(uos), NULL,
865                                  &osa->size, 8);
866         }
867         if (la->la_valid & LA_NLINK) {
868                 osa->nlink = obj->oo_attr.la_nlink = la->la_nlink;
869                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(uos), NULL,
870                                  &osa->nlink, 8);
871         }
872         if (la->la_valid & LA_RDEV) {
873                 osa->rdev = obj->oo_attr.la_rdev = la->la_rdev;
874                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(uos), NULL,
875                                  &osa->rdev, 8);
876         }
877         if (la->la_valid & LA_FLAGS) {
878                 osa->flags = obj->oo_attr.la_flags = la->la_flags;
879                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(uos), NULL,
880                                  &osa->flags, 8);
881         }
882         if (la->la_valid & LA_UID) {
883                 osa->uid = obj->oo_attr.la_uid = la->la_uid;
884                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(uos), NULL,
885                                  &osa->uid, 8);
886         }
887         if (la->la_valid & LA_GID) {
888                 osa->gid = obj->oo_attr.la_gid = la->la_gid;
889                 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(uos), NULL,
890                                  &osa->gid, 8);
891         }
892         obj->oo_attr.la_valid |= la->la_valid;
893         cfs_write_unlock(&obj->oo_attr_lock);
894
895         rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
896
897         OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
898         RETURN(rc);
899 }
900
901 /*
902  * Object creation.
903  *
904  * XXX temporary solution.
905  */
906
907 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
908                         struct dt_object *parent, cfs_umode_t child_mode)
909 {
910         LASSERT(ah);
911
912         memset(ah, 0, sizeof(*ah));
913         ah->dah_mode = child_mode;
914 }
915
916 static int osd_declare_object_create(const struct lu_env *env,
917                                      struct dt_object *dt,
918                                      struct lu_attr *attr,
919                                      struct dt_allocation_hint *hint,
920                                      struct dt_object_format *dof,
921                                      struct thandle *handle)
922 {
923         char                    *buf = osd_oti_get(env)->oti_str;
924         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
925         struct osd_object       *obj = osd_dt_obj(dt);
926         struct osd_device       *osd = osd_obj2dev(obj);
927         struct osd_thandle      *oh;
928         uint64_t                 zapid;
929         ENTRY;
930
931         LASSERT(dof);
932
933         switch (dof->dof_type) {
934                 case DFT_REGULAR:
935                 case DFT_SYM:
936                 case DFT_NODE:
937                         if (obj->oo_dt.do_body_ops == NULL)
938                                 obj->oo_dt.do_body_ops = &osd_body_ops;
939                         break;
940                 default:
941                         break;
942         }
943
944         LASSERT(handle != NULL);
945         oh = container_of0(handle, struct osd_thandle, ot_super);
946         LASSERT(oh->ot_tx != NULL);
947
948         switch (dof->dof_type) {
949                 case DFT_DIR:
950                 case DFT_INDEX:
951                         /* for zap create */
952                         dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
953                         break;
954                 case DFT_REGULAR:
955                 case DFT_SYM:
956                 case DFT_NODE:
957                         /* first, we'll create new object */
958                         dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
959                         break;
960
961                 default:
962                         LBUG();
963                         break;
964         }
965
966         /* and we'll add it to some mapping */
967         zapid = osd_get_name_n_idx(env, osd, fid, buf);
968         dmu_tx_hold_bonus(oh->ot_tx, zapid);
969         dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, buf);
970
971         /* we will also update inode accounting ZAPs */
972         dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
973         dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
974         dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
975         dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
976
977         dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
978
979         RETURN(0);
980 }
981
982 int __osd_attr_init(const struct lu_env *env, udmu_objset_t *uos,
983                     uint64_t oid, dmu_tx_t *tx, struct lu_attr *la)
984 {
985         sa_bulk_attr_t  *bulk;
986         sa_handle_t     *sa_hdl;
987         struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
988         uint64_t         gen;
989         uint64_t         parent;
990         uint64_t         crtime[2];
991         timestruc_t      now;
992         int              cnt;
993         int              rc;
994
995         gethrestime(&now);
996         gen = dmu_tx_get_txg(tx);
997
998         ZFS_TIME_ENCODE(&now, crtime);
999         /* XXX: this should be real id of parent for ZPL access, but we have no
1000          * such info in OSD, probably it can be part of dt_object_format */
1001         parent = 0;
1002
1003         osa->atime[0] = la->la_atime;
1004         osa->ctime[0] = la->la_ctime;
1005         osa->mtime[0] = la->la_mtime;
1006         osa->mode = la->la_mode;
1007         osa->uid = la->la_uid;
1008         osa->gid = la->la_gid;
1009         osa->rdev = la->la_rdev;
1010         osa->nlink = la->la_nlink;
1011         osa->flags = la->la_flags;
1012         osa->size  = la->la_size;
1013
1014         /* Now add in all of the "SA" attributes */
1015         rc = -sa_handle_get(uos->os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
1016         if (rc)
1017                 return rc;
1018
1019         OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 13);
1020         if (bulk == NULL) {
1021                 rc = -ENOMEM;
1022                 goto out;
1023         }
1024         /*
1025          * we need to create all SA below upon object create.
1026          *
1027          * XXX The attribute order matters since the accounting callback relies
1028          * on static offsets (i.e. SA_*_OFFSET, see zfs_space_delta_cb()) to
1029          * look up the UID/GID attributes. Moreover, the callback does not seem
1030          * to support the spill block.
1031          * We define attributes in the same order as SA_*_OFFSET in order to
1032          * work around the problem. See ORI-610.
1033          */
1034         cnt = 0;
1035         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(uos), NULL, &osa->mode, 8);
1036         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(uos), NULL, &osa->size, 8);
1037         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(uos), NULL, &gen, 8);
1038         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(uos), NULL, &osa->uid, 8);
1039         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(uos), NULL, &osa->gid, 8);
1040         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(uos), NULL, &parent, 8);
1041         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(uos), NULL, &osa->flags, 8);
1042         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(uos), NULL, osa->atime, 16);
1043         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(uos), NULL, osa->mtime, 16);
1044         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(uos), NULL, osa->ctime, 16);
1045         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(uos), NULL, crtime, 16);
1046         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(uos), NULL, &osa->nlink, 8);
1047         SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(uos), NULL, &osa->rdev, 8);
1048
1049         rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
1050
1051         OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 13);
1052 out:
1053         sa_handle_destroy(sa_hdl);
1054         return rc;
1055 }
1056
1057 /*
1058  * The transaction passed to this routine must have
1059  * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
1060  * to a transaction group.
1061  */
1062 int __osd_object_create(const struct lu_env *env, udmu_objset_t *uos,
1063                         dmu_buf_t **dbp, dmu_tx_t *tx,
1064                         struct lu_attr *la, void *tag)
1065 {
1066         uint64_t oid;
1067         int      rc;
1068
1069         LASSERT(tag);
1070         cfs_spin_lock(&uos->lock);
1071         uos->objects++;
1072         cfs_spin_unlock(&uos->lock);
1073
1074         /* Assert that the transaction has been assigned to a
1075            transaction group. */
1076         LASSERT(tx->tx_txg != 0);
1077
1078         /* Create a new DMU object. */
1079         oid = dmu_object_alloc(uos->os, DMU_OT_PLAIN_FILE_CONTENTS, 0,
1080                                DMU_OT_SA, DN_MAX_BONUSLEN, tx);
1081         rc = -sa_buf_hold(uos->os, oid, tag, dbp);
1082         if (rc)
1083                 return rc;
1084
1085         LASSERT(la->la_valid & LA_MODE);
1086         la->la_size = 0;
1087         la->la_nlink = 1;
1088
1089         return __osd_attr_init(env, uos, oid, tx, la);
1090 }
1091
1092 /*
1093  * The transaction passed to this routine must have
1094  * dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, ...) called and then assigned
1095  * to a transaction group.
1096  *
1097  * Using ZAP_FLAG_HASH64 will force the ZAP to always be a FAT ZAP.
1098  * This is fine for directories today, because storing the FID in the dirent
1099  * will also require a FAT ZAP.  If there is a new type of micro ZAP created
1100  * then we might need to re-evaluate the use of this flag and instead do
1101  * a conversion from the different internal ZAP hash formats being used. */
1102 int __osd_zap_create(const struct lu_env *env, udmu_objset_t *uos,
1103                      dmu_buf_t **zap_dbp, dmu_tx_t *tx,
1104                      struct lu_attr *la, void *tag, zap_flags_t flags)
1105 {
1106         uint64_t oid;
1107         int      rc;
1108
1109         LASSERT(tag);
1110
1111         cfs_spin_lock(&uos->lock);
1112         uos->objects++;
1113         cfs_spin_unlock(&uos->lock);
1114
1115         /* Assert that the transaction has been assigned to a
1116            transaction group. */
1117         LASSERT(tx->tx_txg != 0);
1118
1119         oid = zap_create_flags(uos->os, 0, flags | ZAP_FLAG_HASH64,
1120                                DMU_OT_DIRECTORY_CONTENTS, 12, 12,
1121                                DMU_OT_SA, DN_MAX_BONUSLEN, tx);
1122
1123         rc = -sa_buf_hold(uos->os, oid, tag, zap_dbp);
1124         if (rc)
1125                 return rc;
1126
1127         LASSERT(la->la_valid & LA_MODE);
1128         la->la_size = 2;
1129         la->la_nlink = 1;
1130
1131         return __osd_attr_init(env, uos, oid, tx, la);
1132 }
1133
1134 static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_device *osd,
1135                             struct lu_attr *la, struct osd_thandle *oh)
1136 {
1137         dmu_buf_t *db;
1138         int        rc;
1139
1140         /* Index file should be created as regular file in order not to confuse
1141          * ZPL which could interpret them as directory.
1142          * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
1143          * binary keys */
1144         LASSERT(S_ISREG(la->la_mode));
1145         rc = __osd_zap_create(env, &osd->od_objset, &db, oh->ot_tx, la,
1146                               osd_obj_tag, ZAP_FLAG_UINT64_KEY);
1147         if (rc)
1148                 return ERR_PTR(rc);
1149         return db;
1150 }
1151
1152 static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_device *osd,
1153                             struct lu_attr *la, struct osd_thandle *oh)
1154 {
1155         dmu_buf_t *db;
1156         int        rc;
1157
1158         LASSERT(S_ISDIR(la->la_mode));
1159         rc = __osd_zap_create(env, &osd->od_objset, &db, oh->ot_tx, la,
1160                               osd_obj_tag, 0);
1161         if (rc)
1162                 return ERR_PTR(rc);
1163         return db;
1164 }
1165
1166 static dmu_buf_t* osd_mkreg(const struct lu_env *env, struct osd_device *osd,
1167                             struct lu_attr *la, struct osd_thandle *oh)
1168 {
1169         dmu_buf_t *db;
1170         int         rc;
1171
1172         LASSERT(S_ISREG(la->la_mode));
1173         rc = __osd_object_create(env, &osd->od_objset, &db, oh->ot_tx, la,
1174                                  osd_obj_tag);
1175         if (rc)
1176                 return ERR_PTR(rc);
1177
1178         /*
1179          * XXX: a hack, OST to use bigger blocksize. we need
1180          * a method in OSD API to control this from OFD/MDD
1181          */
1182         if (!lu_device_is_md(osd2lu_dev(osd))) {
1183                 rc = -dmu_object_set_blocksize(osd->od_objset.os,
1184                                                db->db_object,
1185                                 128 << 10, 0, oh->ot_tx);
1186                 if (unlikely(rc)) {
1187                         CERROR("%s: can't change blocksize: %d\n",
1188                                osd->od_svname, rc);
1189                         return ERR_PTR(rc);
1190                 }
1191         }
1192
1193         return db;
1194 }
1195
1196 static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_device *osd,
1197                             struct lu_attr *la, struct osd_thandle *oh)
1198 {
1199         dmu_buf_t *db;
1200         int        rc;
1201
1202         LASSERT(S_ISLNK(la->la_mode));
1203         rc = __osd_object_create(env, &osd->od_objset, &db, oh->ot_tx, la,
1204                                  osd_obj_tag);
1205         if (rc)
1206                 return ERR_PTR(rc);
1207         return db;
1208 }
1209
1210 static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_device *osd,
1211                             struct lu_attr *la, struct osd_thandle *oh)
1212 {
1213         dmu_buf_t *db;
1214         int        rc;
1215
1216         la->la_valid = LA_MODE;
1217         if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
1218                 la->la_valid |= LA_RDEV;
1219
1220         rc = __osd_object_create(env, &osd->od_objset, &db, oh->ot_tx, la,
1221                                  osd_obj_tag);
1222         if (rc)
1223                 return ERR_PTR(rc);
1224         return db;
1225 }
1226
1227 typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env, struct osd_device *osd,
1228                                      struct lu_attr *la, struct osd_thandle *oh);
1229
1230 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1231 {
1232         osd_obj_type_f result;
1233
1234         switch (type) {
1235         case DFT_DIR:
1236                 result = osd_mkdir;
1237                 break;
1238         case DFT_INDEX:
1239                 result = osd_mkidx;
1240                 break;
1241         case DFT_REGULAR:
1242                 result = osd_mkreg;
1243                 break;
1244         case DFT_SYM:
1245                 result = osd_mksym;
1246                 break;
1247         case DFT_NODE:
1248                 result = osd_mknod;
1249                 break;
1250         default:
1251                 LBUG();
1252                 break;
1253         }
1254         return result;
1255 }
1256
1257 /*
1258  * Primitives for directory (i.e. ZAP) handling
1259  */
1260
1261 /*
1262  * Concurrency: @dt is write locked.
1263  */
1264 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1265                              struct lu_attr *attr,
1266                              struct dt_allocation_hint *hint,
1267                              struct dt_object_format *dof,
1268                              struct thandle *th)
1269 {
1270         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
1271         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1272         struct osd_object       *obj = osd_dt_obj(dt);
1273         struct osd_device       *osd = osd_obj2dev(obj);
1274         char                    *buf = osd_oti_get(env)->oti_str;
1275         struct osd_thandle      *oh;
1276         dmu_buf_t               *db;
1277         uint64_t                 zapid;
1278         int                      rc;
1279
1280         ENTRY;
1281
1282         /* concurrent create declarations should not see
1283          * the object inconsistent (db, attr, etc).
1284          * in regular cases acquisition should be cheap */
1285         cfs_down(&obj->oo_guard);
1286
1287         LASSERT(osd_invariant(obj));
1288         LASSERT(!dt_object_exists(dt));
1289         LASSERT(dof != NULL);
1290
1291         LASSERT(th != NULL);
1292         oh = container_of0(th, struct osd_thandle, ot_super);
1293
1294         /*
1295          * XXX missing: Quote handling.
1296          */
1297
1298         LASSERT(obj->oo_db == NULL);
1299
1300         db = osd_create_type_f(dof->dof_type)(env, osd, attr, oh);
1301         if (IS_ERR(db))
1302                 GOTO(out, rc = PTR_ERR(th));
1303
1304         zde->zde_pad = 0;
1305         zde->zde_dnode = db->db_object;
1306         zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
1307
1308         zapid = osd_get_name_n_idx(env, osd, fid, buf);
1309
1310         rc = -zap_add(osd->od_objset.os, zapid, buf, 8, 1, zde, oh->ot_tx);
1311         if (rc)
1312                 GOTO(out, rc);
1313
1314         /* Add new object to inode accounting.
1315          * Errors are not considered as fatal */
1316         rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
1317                                 (attr->la_valid & LA_UID) ? attr->la_uid : 0, 1,
1318                                 oh->ot_tx);
1319         if (rc)
1320                 CERROR("%s: failed to add "DFID" to accounting ZAP for usr %d "
1321                         "(%d)\n", osd->od_svname, PFID(fid), attr->la_uid, rc);
1322         rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
1323                                 (attr->la_valid & LA_GID) ? attr->la_gid : 0, 1,
1324                                 oh->ot_tx);
1325         if (rc)
1326                 CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d "
1327                         "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
1328
1329         /* configure new osd object */
1330         obj->oo_db = db;
1331         rc = osd_object_init0(env, obj);
1332         LASSERT(ergo(rc == 0, dt_object_exists(dt)));
1333         LASSERT(osd_invariant(obj));
1334
1335 out:
1336         cfs_up(&obj->oo_guard);
1337         RETURN(rc);
1338 }
1339
1340 static int osd_declare_object_ref_add(const struct lu_env *env,
1341                                       struct dt_object *dt,
1342                                       struct thandle *th)
1343 {
1344         return osd_declare_attr_set(env, dt, NULL, th);
1345 }
1346
1347 /*
1348  * Concurrency: @dt is write locked.
1349  */
1350 static int osd_object_ref_add(const struct lu_env *env,
1351                               struct dt_object *dt,
1352                               struct thandle *handle)
1353 {
1354         struct osd_object       *obj = osd_dt_obj(dt);
1355         struct osd_thandle      *oh;
1356         struct osd_device       *osd = osd_obj2dev(obj);
1357         udmu_objset_t           *uos = &osd->od_objset;
1358         uint64_t                 nlink;
1359         int rc;
1360
1361         ENTRY;
1362
1363         LASSERT(osd_invariant(obj));
1364         LASSERT(dt_object_exists(dt));
1365         LASSERT(obj->oo_sa_hdl != NULL);
1366
1367         oh = container_of0(handle, struct osd_thandle, ot_super);
1368
1369         cfs_write_lock(&obj->oo_attr_lock);
1370         nlink = ++obj->oo_attr.la_nlink;
1371         cfs_write_unlock(&obj->oo_attr_lock);
1372
1373         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(uos), &nlink, 8, oh);
1374         return rc;
1375 }
1376
1377 static int osd_declare_object_ref_del(const struct lu_env *env,
1378                                       struct dt_object *dt,
1379                                       struct thandle *handle)
1380 {
1381         return osd_declare_attr_set(env, dt, NULL, handle);
1382 }
1383
1384 /*
1385  * Concurrency: @dt is write locked.
1386  */
1387 static int osd_object_ref_del(const struct lu_env *env,
1388                               struct dt_object *dt,
1389                               struct thandle *handle)
1390 {
1391         struct osd_object       *obj = osd_dt_obj(dt);
1392         struct osd_thandle      *oh;
1393         struct osd_device       *osd = osd_obj2dev(obj);
1394         udmu_objset_t           *uos = &osd->od_objset;
1395         uint64_t                 nlink;
1396         int                      rc;
1397
1398         ENTRY;
1399
1400         LASSERT(osd_invariant(obj));
1401         LASSERT(dt_object_exists(dt));
1402         LASSERT(obj->oo_sa_hdl != NULL);
1403
1404         oh = container_of0(handle, struct osd_thandle, ot_super);
1405         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1406
1407         cfs_write_lock(&obj->oo_attr_lock);
1408         nlink = --obj->oo_attr.la_nlink;
1409         cfs_write_unlock(&obj->oo_attr_lock);
1410
1411         rc = osd_object_sa_update(obj, SA_ZPL_LINKS(uos), &nlink, 8, oh);
1412         return rc;
1413 }
1414
1415 static int capa_is_sane(const struct lu_env *env, struct osd_device *dev,
1416                         struct lustre_capa *capa, struct lustre_capa_key *keys)
1417 {
1418         struct osd_thread_info  *oti = osd_oti_get(env);
1419         struct obd_capa         *oc;
1420         int                      i, rc = 0;
1421         ENTRY;
1422
1423         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1424         if (oc) {
1425                 if (capa_is_expired(oc)) {
1426                         DEBUG_CAPA(D_ERROR, capa, "expired");
1427                         rc = -ESTALE;
1428                 }
1429                 capa_put(oc);
1430                 RETURN(rc);
1431         }
1432
1433         cfs_spin_lock(&capa_lock);
1434         for (i = 0; i < 2; i++) {
1435                 if (keys[i].lk_keyid == capa->lc_keyid) {
1436                         oti->oti_capa_key = keys[i];
1437                         break;
1438                 }
1439         }
1440         cfs_spin_unlock(&capa_lock);
1441
1442         if (i == 2) {
1443                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1444                 RETURN(-ESTALE);
1445         }
1446
1447         rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
1448         if (rc)
1449                 RETURN(rc);
1450         if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
1451         {
1452                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1453                 RETURN(-EACCES);
1454         }
1455
1456         oc = capa_add(dev->od_capa_hash, capa);
1457         capa_put(oc);
1458
1459         RETURN(0);
1460 }
1461
1462 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1463                            struct lustre_capa *capa, __u64 opc)
1464 {
1465         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1466         struct osd_device       *dev = osd_dev(dt->do_lu.lo_dev);
1467         int                      rc;
1468
1469         if (!dev->od_fl_capa)
1470                 return 0;
1471
1472         if (capa == BYPASS_CAPA)
1473                 return 0;
1474
1475         if (!capa) {
1476                 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1477                 return -EACCES;
1478         }
1479
1480         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1481                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",PFID(fid));
1482                 return -EACCES;
1483         }
1484
1485         if (!capa_opc_supported(capa, opc)) {
1486                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1487                 return -EACCES;
1488         }
1489
1490         if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1491                 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1492                 return -EACCES;
1493         }
1494
1495         return 0;
1496 }
1497
1498 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1499                                      struct dt_object *dt,
1500                                      struct lustre_capa *old,
1501                                      __u64 opc)
1502 {
1503         struct osd_thread_info  *info = osd_oti_get(env);
1504         const struct lu_fid     *fid = lu_object_fid(&dt->do_lu);
1505         struct osd_object       *obj = osd_dt_obj(dt);
1506         struct osd_device       *dev = osd_obj2dev(obj);
1507         struct lustre_capa_key  *key = &info->oti_capa_key;
1508         struct lustre_capa      *capa = &info->oti_capa;
1509         struct obd_capa         *oc;
1510         int                      rc;
1511         ENTRY;
1512
1513         if (!dev->od_fl_capa)
1514                 RETURN(ERR_PTR(-ENOENT));
1515
1516         LASSERT(dt_object_exists(dt));
1517         LASSERT(osd_invariant(obj));
1518
1519         /* renewal sanity check */
1520         if (old && osd_object_auth(env, dt, old, opc))
1521                 RETURN(ERR_PTR(-EACCES));
1522
1523         capa->lc_fid = *fid;
1524         capa->lc_opc = opc;
1525         capa->lc_uid = 0;
1526         capa->lc_flags = dev->od_capa_alg << 24;
1527         capa->lc_timeout = dev->od_capa_timeout;
1528         capa->lc_expiry = 0;
1529
1530         oc = capa_lookup(dev->od_capa_hash, capa, 1);
1531         if (oc) {
1532                 LASSERT(!capa_is_expired(oc));
1533                 RETURN(oc);
1534         }
1535
1536         cfs_spin_lock(&capa_lock);
1537         *key = dev->od_capa_keys[1];
1538         cfs_spin_unlock(&capa_lock);
1539
1540         capa->lc_keyid = key->lk_keyid;
1541         capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1542
1543         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1544         if (rc) {
1545                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1546                 LBUG();
1547                 RETURN(ERR_PTR(rc));
1548         }
1549
1550         oc = capa_add(dev->od_capa_hash, capa);
1551         RETURN(oc);
1552 }
1553
1554 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
1555 {
1556         struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
1557         ENTRY;
1558
1559         /* XXX: no other option than syncing the whole filesystem until we
1560          * support ZIL */
1561         txg_wait_synced(dmu_objset_pool(osd->od_objset.os), 0ULL);
1562
1563         RETURN(0);
1564 }
1565
1566 static struct dt_object_operations osd_obj_ops = {
1567         .do_read_lock           = osd_object_read_lock,
1568         .do_write_lock          = osd_object_write_lock,
1569         .do_read_unlock         = osd_object_read_unlock,
1570         .do_write_unlock        = osd_object_write_unlock,
1571         .do_write_locked        = osd_object_write_locked,
1572         .do_attr_get            = osd_attr_get,
1573         .do_declare_attr_set    = osd_declare_attr_set,
1574         .do_attr_set            = osd_attr_set,
1575         .do_ah_init             = osd_ah_init,
1576         .do_declare_create      = osd_declare_object_create,
1577         .do_create              = osd_object_create,
1578         .do_declare_destroy     = osd_declare_object_destroy,
1579         .do_destroy             = osd_object_destroy,
1580         .do_index_try           = osd_index_try,
1581         .do_declare_ref_add     = osd_declare_object_ref_add,
1582         .do_ref_add             = osd_object_ref_add,
1583         .do_declare_ref_del     = osd_declare_object_ref_del,
1584         .do_ref_del             = osd_object_ref_del,
1585         .do_xattr_get           = osd_xattr_get,
1586         .do_declare_xattr_set   = osd_declare_xattr_set,
1587         .do_xattr_set           = osd_xattr_set,
1588         .do_declare_xattr_del   = osd_declare_xattr_del,
1589         .do_xattr_del           = osd_xattr_del,
1590         .do_xattr_list          = osd_xattr_list,
1591         .do_capa_get            = osd_capa_get,
1592         .do_object_sync         = osd_object_sync,
1593 };
1594
1595 static struct lu_object_operations osd_lu_obj_ops = {
1596         .loo_object_init        = osd_object_init,
1597         .loo_object_delete      = osd_object_delete,
1598         .loo_object_release     = osd_object_release,
1599         .loo_object_free        = osd_object_free,
1600         .loo_object_print       = osd_object_print,
1601         .loo_object_invariant   = osd_object_invariant,
1602 };
1603