4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * Copyright (c) 2012, 2013, Intel Corporation.
32 * Use is subject to license terms.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/osd-zfs/osd_object.c
40 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41 * Author: Mike Pershin <tappro@whamcloud.com>
42 * Author: Johann Lombardi <johann@whamcloud.com>
45 #define DEBUG_SUBSYSTEM S_OSD
47 #include <lustre_ver.h>
48 #include <libcfs/libcfs.h>
49 #include <obd_support.h>
50 #include <lustre_net.h>
52 #include <obd_class.h>
53 #include <lustre_disk.h>
54 #include <lustre_fid.h>
56 #include "osd_internal.h"
58 #include <sys/dnode.h>
63 #include <sys/spa_impl.h>
64 #include <sys/zfs_znode.h>
65 #include <sys/dmu_tx.h>
66 #include <sys/dmu_objset.h>
67 #include <sys/dsl_prop.h>
68 #include <sys/sa_impl.h>
71 char *osd_obj_tag = "osd_object";
73 static struct dt_object_operations osd_obj_ops;
74 static struct lu_object_operations osd_lu_obj_ops;
75 extern struct dt_body_operations osd_body_ops;
76 static struct dt_object_operations osd_obj_otable_it_ops;
78 extern struct kmem_cache *osd_object_kmem;
81 osd_object_sa_fini(struct osd_object *obj)
84 sa_handle_destroy(obj->oo_sa_hdl);
85 obj->oo_sa_hdl = NULL;
90 osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
94 LASSERT(obj->oo_sa_hdl == NULL);
95 LASSERT(obj->oo_db != NULL);
97 rc = -sa_handle_get(o->od_os, obj->oo_db->db_object, obj,
98 SA_HDL_PRIVATE, &obj->oo_sa_hdl);
102 /* Cache the xattr object id, valid for the life of the object */
103 rc = -sa_lookup(obj->oo_sa_hdl, SA_ZPL_XATTR(o), &obj->oo_xattr, 8);
105 obj->oo_xattr = ZFS_NO_OBJECT;
108 osd_object_sa_fini(obj);
115 * Add object to list of dirty objects in tx handle.
118 osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
120 if (!list_empty(&obj->oo_sa_linkage))
123 down(&oh->ot_sa_lock);
124 write_lock(&obj->oo_attr_lock);
125 if (likely(list_empty(&obj->oo_sa_linkage)))
126 list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
127 write_unlock(&obj->oo_attr_lock);
132 * Release spill block dbuf hold for all dirty SAs.
134 void osd_object_sa_dirty_rele(struct osd_thandle *oh)
136 struct osd_object *obj;
138 down(&oh->ot_sa_lock);
139 while (!list_empty(&oh->ot_sa_list)) {
140 obj = list_entry(oh->ot_sa_list.next,
141 struct osd_object, oo_sa_linkage);
142 sa_spill_rele(obj->oo_sa_hdl);
143 write_lock(&obj->oo_attr_lock);
144 list_del_init(&obj->oo_sa_linkage);
145 write_unlock(&obj->oo_attr_lock);
151 * Update the SA and add the object to the dirty list.
153 int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type,
154 void *buf, uint32_t buflen, struct osd_thandle *oh)
158 LASSERT(obj->oo_sa_hdl != NULL);
159 LASSERT(oh->ot_tx != NULL);
161 rc = -sa_update(obj->oo_sa_hdl, type, buf, buflen, oh->ot_tx);
162 osd_object_sa_dirty_add(obj, oh);
168 * Bulk update the SA and add the object to the dirty list.
171 osd_object_sa_bulk_update(struct osd_object *obj, sa_bulk_attr_t *attrs,
172 int count, struct osd_thandle *oh)
176 LASSERT(obj->oo_sa_hdl != NULL);
177 LASSERT(oh->ot_tx != NULL);
179 rc = -sa_bulk_update(obj->oo_sa_hdl, attrs, count, oh->ot_tx);
180 osd_object_sa_dirty_add(obj, oh);
186 * Retrieve the attributes of a DMU object
188 int __osd_object_attr_get(const struct lu_env *env, struct osd_device *o,
189 struct osd_object *obj, struct lu_attr *la)
191 struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
193 sa_bulk_attr_t *bulk;
198 LASSERT(obj->oo_db != NULL);
200 rc = -sa_handle_get(o->od_os, obj->oo_db->db_object, NULL,
201 SA_HDL_PRIVATE, &sa_hdl);
205 OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 9);
207 GOTO(out_sa, rc = -ENOMEM);
209 la->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE | LA_TYPE |
210 LA_SIZE | LA_UID | LA_GID | LA_FLAGS | LA_NLINK;
212 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(o), NULL, osa->atime, 16);
213 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(o), NULL, osa->mtime, 16);
214 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(o), NULL, osa->ctime, 16);
215 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(o), NULL, &osa->mode, 8);
216 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(o), NULL, &osa->size, 8);
217 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(o), NULL, &osa->nlink, 8);
218 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(o), NULL, &osa->uid, 8);
219 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(o), NULL, &osa->gid, 8);
220 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(o), NULL, &osa->flags, 8);
222 rc = -sa_bulk_lookup(sa_hdl, bulk, cnt);
226 la->la_atime = osa->atime[0];
227 la->la_mtime = osa->mtime[0];
228 la->la_ctime = osa->ctime[0];
229 la->la_mode = osa->mode;
230 la->la_uid = osa->uid;
231 la->la_gid = osa->gid;
232 la->la_nlink = osa->nlink;
233 la->la_flags = attrs_zfs2fs(osa->flags);
234 la->la_size = osa->size;
236 if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) {
237 rc = -sa_lookup(sa_hdl, SA_ZPL_RDEV(o), &osa->rdev, 8);
240 la->la_rdev = osa->rdev;
241 la->la_valid |= LA_RDEV;
244 OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 9);
246 sa_handle_destroy(sa_hdl);
251 int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
252 uint64_t oid, dmu_buf_t **dbp)
254 dmu_object_info_t *doi = &osd_oti_get(env)->oti_doi;
257 rc = -sa_buf_hold(os, oid, osd_obj_tag, dbp);
261 dmu_object_info_from_db(*dbp, doi);
262 if (unlikely (oid != DMU_USERUSED_OBJECT &&
263 oid != DMU_GROUPUSED_OBJECT && doi->doi_bonus_type != DMU_OT_SA)) {
264 sa_buf_rele(*dbp, osd_obj_tag);
270 LASSERT((*dbp)->db_object == oid);
271 LASSERT((*dbp)->db_offset == -1);
272 LASSERT((*dbp)->db_data != NULL);
278 * Concurrency: no concurrent access is possible that early in object
281 struct lu_object *osd_object_alloc(const struct lu_env *env,
282 const struct lu_object_header *hdr,
285 struct osd_object *mo;
287 OBD_SLAB_ALLOC_PTR_GFP(mo, osd_object_kmem, GFP_NOFS);
291 l = &mo->oo_dt.do_lu;
292 dt_object_init(&mo->oo_dt, NULL, d);
293 mo->oo_dt.do_ops = &osd_obj_ops;
294 l->lo_ops = &osd_lu_obj_ops;
295 INIT_LIST_HEAD(&mo->oo_sa_linkage);
296 init_rwsem(&mo->oo_sem);
297 sema_init(&mo->oo_guard, 1);
298 rwlock_init(&mo->oo_attr_lock);
306 * Concurrency: shouldn't matter.
308 int osd_object_init0(const struct lu_env *env, struct osd_object *obj)
310 struct osd_device *osd = osd_obj2dev(obj);
311 const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
315 if (obj->oo_db == NULL)
320 rc = osd_object_sa_init(obj, osd);
324 /* cache attrs in object */
325 rc = __osd_object_attr_get(env, osd, obj, &obj->oo_attr);
329 if (likely(!fid_is_acct(fid)))
330 /* no body operations for accounting objects */
331 obj->oo_dt.do_body_ops = &osd_body_ops;
334 * initialize object before marking it existing
336 obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
339 obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS;
344 static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
346 struct osd_thread_info *info = osd_oti_get(env);
349 struct lustre_mdt_attrs *lma;
352 CLASSERT(sizeof(info->oti_buf) >= sizeof(*lma));
353 lma = (struct lustre_mdt_attrs *)info->oti_buf;
355 buf.lb_len = sizeof(info->oti_buf);
357 rc = osd_xattr_get(env, &obj->oo_dt, &buf, XATTR_NAME_LMA, BYPASS_CAPA);
360 lustre_lma_swab(lma);
361 if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
362 CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
363 CWARN("%s: unsupported incompat LMA feature(s) %#x for "
364 "fid = "DFID"\n", osd_obj2dev(obj)->od_svname,
365 lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
366 PFID(lu_object_fid(&obj->oo_dt.do_lu)));
369 } else if (rc == -ENODATA) {
370 /* haven't initialize LMA xattr */
378 * Concurrency: no concurrent access is possible that early in object
381 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
382 const struct lu_object_conf *conf)
384 struct osd_object *obj = osd_obj(l);
385 struct osd_device *osd = osd_obj2dev(obj);
390 LASSERT(osd_invariant(obj));
392 if (fid_is_otable_it(&l->lo_header->loh_fid)) {
393 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
394 l->lo_header->loh_attr |= LOHA_EXISTS;
398 rc = osd_fid_lookup(env, osd, lu_object_fid(l), &oid);
400 LASSERT(obj->oo_db == NULL);
401 rc = __osd_obj2dbuf(env, osd->od_os, oid, &obj->oo_db);
403 CERROR("%s: lookup "DFID"/"LPX64" failed: rc = %d\n",
404 osd->od_svname, PFID(lu_object_fid(l)), oid, rc);
408 rc = osd_object_init0(env, obj);
412 rc = osd_check_lma(env, obj);
415 } else if (rc == -ENOENT) {
418 LASSERT(osd_invariant(obj));
424 * Concurrency: no concurrent access is possible that late in object
427 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
429 struct osd_object *obj = osd_obj(l);
431 LASSERT(osd_invariant(obj));
433 dt_object_fini(&obj->oo_dt);
434 OBD_SLAB_FREE_PTR(obj, osd_object_kmem);
437 static void __osd_declare_object_destroy(const struct lu_env *env,
438 struct osd_object *obj,
439 struct osd_thandle *oh)
441 struct osd_device *osd = osd_obj2dev(obj);
442 dmu_buf_t *db = obj->oo_db;
443 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
444 uint64_t oid = db->db_object, xid;
445 dmu_tx_t *tx = oh->ot_tx;
449 dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
451 /* zap holding xattrs */
452 if (obj->oo_xattr != ZFS_NO_OBJECT) {
455 dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
457 rc = osd_zap_cursor_init(&zc, osd->od_os, oid, 0);
461 while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
462 BUG_ON(za->za_integer_length != sizeof(uint64_t));
463 BUG_ON(za->za_num_integers != 1);
465 rc = -zap_lookup(osd->od_os, obj->oo_xattr, za->za_name,
466 sizeof(uint64_t), 1, &xid);
468 CERROR("%s: xattr lookup failed: rc = %d\n",
472 dmu_tx_hold_free(tx, xid, 0, DMU_OBJECT_END);
474 zap_cursor_advance(zc);
479 osd_zap_cursor_fini(zc);
482 if (rc && tx->tx_err == 0)
486 static int osd_declare_object_destroy(const struct lu_env *env,
487 struct dt_object *dt,
490 char *buf = osd_oti_get(env)->oti_str;
491 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
492 struct osd_object *obj = osd_dt_obj(dt);
493 struct osd_device *osd = osd_obj2dev(obj);
494 struct osd_thandle *oh;
500 LASSERT(dt_object_exists(dt));
502 oh = container_of0(th, struct osd_thandle, ot_super);
503 LASSERT(oh->ot_tx != NULL);
505 /* declare that we'll destroy the object */
506 __osd_declare_object_destroy(env, obj, oh);
508 /* declare that we'll remove object from fid-dnode mapping */
509 zapid = osd_get_name_n_idx(env, osd, fid, buf);
510 dmu_tx_hold_bonus(oh->ot_tx, zapid);
511 dmu_tx_hold_zap(oh->ot_tx, zapid, 0, buf);
513 /* declare that we'll remove object from inode accounting ZAPs */
514 dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
515 dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, 0, buf);
516 dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
517 dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf);
520 rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
521 obj->oo_attr.la_gid, -1, oh, false, NULL, false);
525 /* data to be truncated */
526 rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
527 obj->oo_attr.la_gid, 0, oh, true, NULL, false);
532 * Delete a DMU object
534 * The transaction passed to this routine must have
535 * dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END) called
536 * and then assigned to a transaction group.
538 * This will release db and set it to NULL to prevent further dbuf releases.
540 static int __osd_object_destroy(const struct lu_env *env,
541 struct osd_object *obj,
542 dmu_tx_t *tx, void *tag)
544 struct osd_device *osd = osd_obj2dev(obj);
546 zap_attribute_t *za = &osd_oti_get(env)->oti_za;
550 /* Assert that the transaction has been assigned to a
551 transaction group. */
552 LASSERT(tx->tx_txg != 0);
554 /* zap holding xattrs */
555 if (obj->oo_xattr != ZFS_NO_OBJECT) {
556 rc = osd_zap_cursor_init(&zc, osd->od_os, obj->oo_xattr, 0);
559 while ((rc = -zap_cursor_retrieve(zc, za)) == 0) {
560 BUG_ON(za->za_integer_length != sizeof(uint64_t));
561 BUG_ON(za->za_num_integers != 1);
563 rc = -zap_lookup(osd->od_os, obj->oo_xattr, za->za_name,
564 sizeof(uint64_t), 1, &xid);
566 CERROR("%s: lookup xattr %s failed: rc = %d\n",
567 osd->od_svname, za->za_name, rc);
570 rc = -dmu_object_free(osd->od_os, xid, tx);
572 CERROR("%s: fetch xattr %s failed: rc = %d\n",
573 osd->od_svname, za->za_name, rc);
574 zap_cursor_advance(zc);
576 osd_zap_cursor_fini(zc);
578 rc = -dmu_object_free(osd->od_os, obj->oo_xattr, tx);
580 CERROR("%s: freeing xattr failed: rc = %d\n",
584 return -dmu_object_free(osd->od_os, obj->oo_db->db_object, tx);
587 static int osd_object_destroy(const struct lu_env *env,
588 struct dt_object *dt,
591 char *buf = osd_oti_get(env)->oti_str;
592 struct osd_object *obj = osd_dt_obj(dt);
593 struct osd_device *osd = osd_obj2dev(obj);
594 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
595 struct osd_thandle *oh;
600 LASSERT(obj->oo_db != NULL);
601 LASSERT(dt_object_exists(dt));
602 LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
604 oh = container_of0(th, struct osd_thandle, ot_super);
606 LASSERT(oh->ot_tx != NULL);
608 zapid = osd_get_name_n_idx(env, osd, fid, buf);
610 /* remove obj ref from index dir (it depends) */
611 rc = -zap_remove(osd->od_os, zapid, buf, oh->ot_tx);
613 CERROR("%s: zap_remove() failed: rc = %d\n",
618 /* Remove object from inode accounting. It is not fatal for the destroy
619 * operation if something goes wrong while updating accounting, but we
620 * still log an error message to notify the administrator */
621 rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
622 obj->oo_attr.la_uid, -1, oh->ot_tx);
624 CERROR("%s: failed to remove "DFID" from accounting ZAP for usr"
625 " %d: rc = %d\n", osd->od_svname, PFID(fid),
626 obj->oo_attr.la_uid, rc);
627 rc = -zap_increment_int(osd->od_os, osd->od_igrp_oid,
628 obj->oo_attr.la_gid, -1, oh->ot_tx);
630 CERROR("%s: failed to remove "DFID" from accounting ZAP for grp"
631 " %d: rc = %d\n", osd->od_svname, PFID(fid),
632 obj->oo_attr.la_gid, rc);
635 rc = __osd_object_destroy(env, obj, oh->ot_tx, osd_obj_tag);
637 CERROR("%s: __osd_object_destroy() failed: rc = %d\n",
643 /* not needed in the cache anymore */
644 set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
649 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
651 struct osd_object *obj = osd_obj(l);
653 if (obj->oo_db != NULL) {
654 osd_object_sa_fini(obj);
655 if (obj->oo_sa_xattr) {
656 nvlist_free(obj->oo_sa_xattr);
657 obj->oo_sa_xattr = NULL;
659 sa_buf_rele(obj->oo_db, osd_obj_tag);
660 list_del(&obj->oo_sa_linkage);
666 * Concurrency: ->loo_object_release() is called under site spin-lock.
668 static void osd_object_release(const struct lu_env *env,
674 * Concurrency: shouldn't matter.
676 static int osd_object_print(const struct lu_env *env, void *cookie,
677 lu_printer_t p, const struct lu_object *l)
679 struct osd_object *o = osd_obj(l);
681 return (*p)(env, cookie, LUSTRE_OSD_ZFS_NAME"-object@%p", o);
684 static void osd_object_read_lock(const struct lu_env *env,
685 struct dt_object *dt, unsigned role)
687 struct osd_object *obj = osd_dt_obj(dt);
689 LASSERT(osd_invariant(obj));
691 down_read(&obj->oo_sem);
694 static void osd_object_write_lock(const struct lu_env *env,
695 struct dt_object *dt, unsigned role)
697 struct osd_object *obj = osd_dt_obj(dt);
699 LASSERT(osd_invariant(obj));
701 down_write(&obj->oo_sem);
704 static void osd_object_read_unlock(const struct lu_env *env,
705 struct dt_object *dt)
707 struct osd_object *obj = osd_dt_obj(dt);
709 LASSERT(osd_invariant(obj));
710 up_read(&obj->oo_sem);
713 static void osd_object_write_unlock(const struct lu_env *env,
714 struct dt_object *dt)
716 struct osd_object *obj = osd_dt_obj(dt);
718 LASSERT(osd_invariant(obj));
719 up_write(&obj->oo_sem);
722 static int osd_object_write_locked(const struct lu_env *env,
723 struct dt_object *dt)
725 struct osd_object *obj = osd_dt_obj(dt);
728 LASSERT(osd_invariant(obj));
730 if (down_write_trylock(&obj->oo_sem)) {
732 up_write(&obj->oo_sem);
737 static int osd_attr_get(const struct lu_env *env,
738 struct dt_object *dt,
739 struct lu_attr *attr,
740 struct lustre_capa *capa)
742 struct osd_object *obj = osd_dt_obj(dt);
746 LASSERT(dt_object_exists(dt));
747 LASSERT(osd_invariant(obj));
750 read_lock(&obj->oo_attr_lock);
751 *attr = obj->oo_attr;
752 read_unlock(&obj->oo_attr_lock);
754 /* with ZFS_DEBUG zrl_add_debug() called by DB_DNODE_ENTER()
755 * from within sa_object_size() can block on a mutex, so
756 * we can't call sa_object_size() holding rwlock */
757 sa_object_size(obj->oo_sa_hdl, &blksize, &blocks);
758 /* we do not control size of indices, so always calculate
759 * it from number of blocks reported by DMU */
760 if (S_ISDIR(attr->la_mode))
761 attr->la_size = 512 * blocks;
762 /* Block size may be not set; suggest maximal I/O transfers. */
764 blksize = 1ULL << SPA_MAXBLOCKSHIFT;
766 attr->la_blksize = blksize;
767 attr->la_blocks = blocks;
768 attr->la_valid |= LA_BLOCKS | LA_BLKSIZE;
773 /* Simple wrapper on top of qsd API which implement quota transfer for osd
774 * setattr needs. As a reminder, only the root user can change ownership of
775 * a file, that's why EDQUOT & EINPROGRESS errors are discarded */
776 static inline int qsd_transfer(const struct lu_env *env,
777 struct qsd_instance *qsd,
778 struct lquota_trans *trans, int qtype,
779 __u64 orig_id, __u64 new_id, __u64 bspace,
780 struct lquota_id_info *qi)
784 if (unlikely(qsd == NULL))
787 LASSERT(qtype >= 0 && qtype < MAXQUOTAS);
788 qi->lqi_type = qtype;
790 /* inode accounting */
791 qi->lqi_is_blk = false;
793 /* one more inode for the new owner ... */
794 qi->lqi_id.qid_uid = new_id;
796 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
797 if (rc == -EDQUOT || rc == -EINPROGRESS)
802 /* and one less inode for the current id */
803 qi->lqi_id.qid_uid = orig_id;;
805 /* can't get EDQUOT when reducing usage */
806 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
807 if (rc == -EINPROGRESS)
812 /* block accounting */
813 qi->lqi_is_blk = true;
815 /* more blocks for the new owner ... */
816 qi->lqi_id.qid_uid = new_id;
817 qi->lqi_space = bspace;
818 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
819 if (rc == -EDQUOT || rc == -EINPROGRESS)
824 /* and finally less blocks for the current owner */
825 qi->lqi_id.qid_uid = orig_id;
826 qi->lqi_space = -bspace;
827 rc = qsd_op_begin(env, qsd, trans, qi, NULL);
828 /* can't get EDQUOT when reducing usage */
829 if (rc == -EINPROGRESS)
834 static int osd_declare_attr_set(const struct lu_env *env,
835 struct dt_object *dt,
836 const struct lu_attr *attr,
837 struct thandle *handle)
839 struct osd_thread_info *info = osd_oti_get(env);
840 char *buf = osd_oti_get(env)->oti_str;
841 struct osd_object *obj = osd_dt_obj(dt);
842 struct osd_device *osd = osd_obj2dev(obj);
843 struct osd_thandle *oh;
849 if (!dt_object_exists(dt)) {
850 /* XXX: sanity check that object creation is declared */
854 LASSERT(handle != NULL);
855 LASSERT(osd_invariant(obj));
857 oh = container_of0(handle, struct osd_thandle, ot_super);
859 LASSERT(obj->oo_sa_hdl != NULL);
860 LASSERT(oh->ot_tx != NULL);
861 dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
862 if (oh->ot_tx->tx_err != 0)
863 RETURN(-oh->ot_tx->tx_err);
865 sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
866 bspace = toqb(bspace * blksize);
868 if (attr && attr->la_valid & LA_UID) {
869 /* account for user inode tracking ZAP update */
870 dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
871 dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
873 /* quota enforcement for user */
874 if (attr->la_uid != obj->oo_attr.la_uid) {
875 rc = qsd_transfer(env, osd->od_quota_slave,
876 &oh->ot_quota_trans, USRQUOTA,
877 obj->oo_attr.la_uid, attr->la_uid,
878 bspace, &info->oti_qi);
883 if (attr && attr->la_valid & LA_GID) {
884 /* account for user inode tracking ZAP update */
885 dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
886 dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
888 /* quota enforcement for group */
889 if (attr->la_gid != obj->oo_attr.la_gid) {
890 rc = qsd_transfer(env, osd->od_quota_slave,
891 &oh->ot_quota_trans, GRPQUOTA,
892 obj->oo_attr.la_gid, attr->la_gid,
893 bspace, &info->oti_qi);
903 * Set the attributes of an object
905 * The transaction passed to this routine must have
906 * dmu_tx_hold_bonus(tx, oid) called and then assigned
907 * to a transaction group.
909 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
910 const struct lu_attr *la, struct thandle *handle,
911 struct lustre_capa *capa)
913 struct osd_object *obj = osd_dt_obj(dt);
914 struct osd_device *osd = osd_obj2dev(obj);
915 struct osd_thandle *oh;
916 struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
917 sa_bulk_attr_t *bulk;
918 __u64 valid = la->la_valid;
923 LASSERT(handle != NULL);
924 LASSERT(dt_object_exists(dt));
925 LASSERT(osd_invariant(obj));
926 LASSERT(obj->oo_sa_hdl);
928 oh = container_of0(handle, struct osd_thandle, ot_super);
929 /* Assert that the transaction has been assigned to a
930 transaction group. */
931 LASSERT(oh->ot_tx->tx_txg != 0);
933 /* Only allow set size for regular file */
934 if (!S_ISREG(dt->do_lu.lo_header->loh_attr))
935 valid &= ~(LA_SIZE | LA_BLOCKS);
940 OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 10);
944 /* do both accounting updates outside oo_attr_lock below */
945 if ((valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
946 /* Update user accounting. Failure isn't fatal, but we still
947 * log an error message */
948 rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
949 la->la_uid, 1, oh->ot_tx);
951 CERROR("%s: failed to update accounting ZAP for user "
952 "%d (%d)\n", osd->od_svname, la->la_uid, rc);
953 rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
954 obj->oo_attr.la_uid, -1, oh->ot_tx);
956 CERROR("%s: failed to update accounting ZAP for user "
957 "%d (%d)\n", osd->od_svname,
958 obj->oo_attr.la_uid, rc);
960 if ((valid & LA_GID) && (la->la_gid != obj->oo_attr.la_gid)) {
961 /* Update group accounting. Failure isn't fatal, but we still
962 * log an error message */
963 rc = -zap_increment_int(osd->od_os, osd->od_igrp_oid,
964 la->la_gid, 1, oh->ot_tx);
966 CERROR("%s: failed to update accounting ZAP for user "
967 "%d (%d)\n", osd->od_svname, la->la_gid, rc);
968 rc = -zap_increment_int(osd->od_os, osd->od_igrp_oid,
969 obj->oo_attr.la_gid, -1, oh->ot_tx);
971 CERROR("%s: failed to update accounting ZAP for user "
972 "%d (%d)\n", osd->od_svname,
973 obj->oo_attr.la_gid, rc);
976 write_lock(&obj->oo_attr_lock);
978 if (valid & LA_ATIME) {
979 osa->atime[0] = obj->oo_attr.la_atime = la->la_atime;
980 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL,
983 if (valid & LA_MTIME) {
984 osa->mtime[0] = obj->oo_attr.la_mtime = la->la_mtime;
985 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL,
988 if (valid & LA_CTIME) {
989 osa->ctime[0] = obj->oo_attr.la_ctime = la->la_ctime;
990 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL,
993 if (valid & LA_MODE) {
994 /* mode is stored along with type, so read it first */
995 obj->oo_attr.la_mode = (obj->oo_attr.la_mode & S_IFMT) |
996 (la->la_mode & ~S_IFMT);
997 osa->mode = obj->oo_attr.la_mode;
998 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL,
1001 if (valid & LA_SIZE) {
1002 osa->size = obj->oo_attr.la_size = la->la_size;
1003 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL,
1006 if (valid & LA_NLINK) {
1007 osa->nlink = obj->oo_attr.la_nlink = la->la_nlink;
1008 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL,
1011 if (valid & LA_RDEV) {
1012 osa->rdev = obj->oo_attr.la_rdev = la->la_rdev;
1013 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL,
1016 if (valid & LA_FLAGS) {
1017 osa->flags = attrs_fs2zfs(la->la_flags);
1018 /* many flags are not supported by zfs, so ensure a good cached
1020 obj->oo_attr.la_flags = attrs_zfs2fs(osa->flags);
1021 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL,
1024 if (valid & LA_UID) {
1025 osa->uid = obj->oo_attr.la_uid = la->la_uid;
1026 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL,
1029 if (valid & LA_GID) {
1030 osa->gid = obj->oo_attr.la_gid = la->la_gid;
1031 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL,
1034 obj->oo_attr.la_valid |= valid;
1035 write_unlock(&obj->oo_attr_lock);
1037 rc = osd_object_sa_bulk_update(obj, bulk, cnt, oh);
1039 OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 10);
1046 * XXX temporary solution.
1049 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1050 struct dt_object *parent, struct dt_object *child,
1055 memset(ah, 0, sizeof(*ah));
1056 ah->dah_parent = parent;
1057 ah->dah_mode = child_mode;
1060 static int osd_declare_object_create(const struct lu_env *env,
1061 struct dt_object *dt,
1062 struct lu_attr *attr,
1063 struct dt_allocation_hint *hint,
1064 struct dt_object_format *dof,
1065 struct thandle *handle)
1067 char *buf = osd_oti_get(env)->oti_str;
1068 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1069 struct osd_object *obj = osd_dt_obj(dt);
1070 struct osd_device *osd = osd_obj2dev(obj);
1071 struct osd_thandle *oh;
1078 switch (dof->dof_type) {
1082 if (obj->oo_dt.do_body_ops == NULL)
1083 obj->oo_dt.do_body_ops = &osd_body_ops;
1089 LASSERT(handle != NULL);
1090 oh = container_of0(handle, struct osd_thandle, ot_super);
1091 LASSERT(oh->ot_tx != NULL);
1093 switch (dof->dof_type) {
1095 dt->do_index_ops = &osd_dir_ops;
1097 /* for zap create */
1098 dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
1103 /* first, we'll create new object */
1104 dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
1112 /* and we'll add it to some mapping */
1113 zapid = osd_get_name_n_idx(env, osd, fid, buf);
1114 dmu_tx_hold_bonus(oh->ot_tx, zapid);
1115 dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, buf);
1117 /* we will also update inode accounting ZAPs */
1118 dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
1119 dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
1120 dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
1121 dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
1123 dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
1125 __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs),
1126 XATTR_NAME_LMA, oh);
1128 rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
1129 false, NULL, false);
1133 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
1134 uint64_t oid, dmu_tx_t *tx, struct lu_attr *la,
1137 sa_bulk_attr_t *bulk;
1138 sa_handle_t *sa_hdl;
1139 struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
1147 gen = dmu_tx_get_txg(tx);
1149 ZFS_TIME_ENCODE(&now, crtime);
1151 osa->atime[0] = la->la_atime;
1152 osa->ctime[0] = la->la_ctime;
1153 osa->mtime[0] = la->la_mtime;
1154 osa->mode = la->la_mode;
1155 osa->uid = la->la_uid;
1156 osa->gid = la->la_gid;
1157 osa->rdev = la->la_rdev;
1158 osa->nlink = la->la_nlink;
1159 osa->flags = attrs_fs2zfs(la->la_flags);
1160 osa->size = la->la_size;
1162 /* Now add in all of the "SA" attributes */
1163 rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
1167 OBD_ALLOC(bulk, sizeof(sa_bulk_attr_t) * 13);
1173 * we need to create all SA below upon object create.
1175 * XXX The attribute order matters since the accounting callback relies
1176 * on static offsets (i.e. SA_*_OFFSET, see zfs_space_delta_cb()) to
1177 * look up the UID/GID attributes. Moreover, the callback does not seem
1178 * to support the spill block.
1179 * We define attributes in the same order as SA_*_OFFSET in order to
1180 * work around the problem. See ORI-610.
1183 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8);
1184 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8);
1185 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8);
1186 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8);
1187 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8);
1188 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL, &parent, 8);
1189 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8);
1190 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16);
1191 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16);
1192 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16);
1193 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, crtime, 16);
1194 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8);
1195 SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
1197 rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
1199 OBD_FREE(bulk, sizeof(sa_bulk_attr_t) * 13);
1201 sa_handle_destroy(sa_hdl);
1206 * The transaction passed to this routine must have
1207 * dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
1208 * to a transaction group.
1210 int __osd_object_create(const struct lu_env *env, struct osd_device *osd,
1211 dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la,
1217 /* Assert that the transaction has been assigned to a
1218 transaction group. */
1219 LASSERT(tx->tx_txg != 0);
1221 /* Create a new DMU object. */
1222 oid = dmu_object_alloc(osd->od_os, DMU_OT_PLAIN_FILE_CONTENTS, 0,
1223 DMU_OT_SA, DN_MAX_BONUSLEN, tx);
1224 rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, dbp);
1225 LASSERTF(rc == 0, "sa_buf_hold "LPU64" failed: %d\n", oid, rc);
1227 LASSERT(la->la_valid & LA_MODE);
1231 rc = __osd_attr_init(env, osd, oid, tx, la, parent);
1233 sa_buf_rele(*dbp, osd_obj_tag);
1235 dmu_object_free(osd->od_os, oid, tx);
1243 * The transaction passed to this routine must have
1244 * dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, ...) called and then assigned
1245 * to a transaction group.
1247 * Using ZAP_FLAG_HASH64 will force the ZAP to always be a FAT ZAP.
1248 * This is fine for directories today, because storing the FID in the dirent
1249 * will also require a FAT ZAP. If there is a new type of micro ZAP created
1250 * then we might need to re-evaluate the use of this flag and instead do
1251 * a conversion from the different internal ZAP hash formats being used. */
1252 int __osd_zap_create(const struct lu_env *env, struct osd_device *osd,
1253 dmu_buf_t **zap_dbp, dmu_tx_t *tx,
1254 struct lu_attr *la, uint64_t parent, zap_flags_t flags)
1259 /* Assert that the transaction has been assigned to a
1260 transaction group. */
1261 LASSERT(tx->tx_txg != 0);
1263 oid = zap_create_flags(osd->od_os, 0, flags | ZAP_FLAG_HASH64,
1264 DMU_OT_DIRECTORY_CONTENTS,
1265 14, /* == ZFS fzap_default_block_shift */
1266 DN_MAX_INDBLKSHIFT, /* indirect block shift */
1267 DMU_OT_SA, DN_MAX_BONUSLEN, tx);
1269 rc = -sa_buf_hold(osd->od_os, oid, osd_obj_tag, zap_dbp);
1273 LASSERT(la->la_valid & LA_MODE);
1277 return __osd_attr_init(env, osd, oid, tx, la, parent);
1280 static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_device *osd,
1281 struct lu_attr *la, uint64_t parent,
1282 struct osd_thandle *oh)
1287 /* Index file should be created as regular file in order not to confuse
1288 * ZPL which could interpret them as directory.
1289 * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use
1291 LASSERT(S_ISREG(la->la_mode));
1292 rc = __osd_zap_create(env, osd, &db, oh->ot_tx, la, parent,
1293 ZAP_FLAG_UINT64_KEY);
1299 static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_device *osd,
1300 struct lu_attr *la, uint64_t parent,
1301 struct osd_thandle *oh)
1306 LASSERT(S_ISDIR(la->la_mode));
1307 rc = __osd_zap_create(env, osd, &db, oh->ot_tx, la, parent, 0);
1313 static dmu_buf_t* osd_mkreg(const struct lu_env *env, struct osd_device *osd,
1314 struct lu_attr *la, uint64_t parent,
1315 struct osd_thandle *oh)
1320 LASSERT(S_ISREG(la->la_mode));
1321 rc = __osd_object_create(env, osd, &db, oh->ot_tx, la, parent);
1326 * XXX: a hack, OST to use bigger blocksize. we need
1327 * a method in OSD API to control this from OFD/MDD
1329 if (!lu_device_is_md(osd2lu_dev(osd))) {
1330 rc = -dmu_object_set_blocksize(osd->od_os,
1332 128 << 10, 0, oh->ot_tx);
1334 CERROR("%s: can't change blocksize: %d\n",
1335 osd->od_svname, rc);
1343 static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_device *osd,
1344 struct lu_attr *la, uint64_t parent,
1345 struct osd_thandle *oh)
1350 LASSERT(S_ISLNK(la->la_mode));
1351 rc = __osd_object_create(env, osd, &db, oh->ot_tx, la, parent);
1357 static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_device *osd,
1358 struct lu_attr *la, uint64_t parent,
1359 struct osd_thandle *oh)
1364 la->la_valid = LA_MODE;
1365 if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode))
1366 la->la_valid |= LA_RDEV;
1368 rc = __osd_object_create(env, osd, &db, oh->ot_tx, la, parent);
1374 typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env,
1375 struct osd_device *osd,
1378 struct osd_thandle *oh);
1380 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1382 osd_obj_type_f result;
1408 * Primitives for directory (i.e. ZAP) handling
1410 static inline int osd_init_lma(const struct lu_env *env, struct osd_object *obj,
1411 const struct lu_fid *fid, struct osd_thandle *oh)
1413 struct osd_thread_info *info = osd_oti_get(env);
1414 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
1418 lustre_lma_init(lma, fid, 0, 0);
1419 lustre_lma_swab(lma);
1421 buf.lb_len = sizeof(*lma);
1423 rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA,
1424 LU_XATTR_CREATE, oh, BYPASS_CAPA);
1430 * Concurrency: @dt is write locked.
1432 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1433 struct lu_attr *attr,
1434 struct dt_allocation_hint *hint,
1435 struct dt_object_format *dof,
1438 struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
1439 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1440 struct osd_object *obj = osd_dt_obj(dt);
1441 struct osd_device *osd = osd_obj2dev(obj);
1442 char *buf = osd_oti_get(env)->oti_str;
1443 struct osd_thandle *oh;
1450 /* concurrent create declarations should not see
1451 * the object inconsistent (db, attr, etc).
1452 * in regular cases acquisition should be cheap */
1453 down(&obj->oo_guard);
1455 LASSERT(osd_invariant(obj));
1456 LASSERT(!dt_object_exists(dt));
1457 LASSERT(dof != NULL);
1459 LASSERT(th != NULL);
1460 oh = container_of0(th, struct osd_thandle, ot_super);
1463 * XXX missing: Quote handling.
1466 LASSERT(obj->oo_db == NULL);
1468 /* to follow ZFS on-disk format we need
1469 * to initialize parent dnode properly */
1471 if (hint && hint->dah_parent)
1472 zapid = osd_dt_obj(hint->dah_parent)->oo_db->db_object;
1474 db = osd_create_type_f(dof->dof_type)(env, osd, attr, zapid, oh);
1476 GOTO(out, rc = PTR_ERR(db));
1479 zde->zde_dnode = db->db_object;
1480 zde->zde_type = IFTODT(attr->la_mode & S_IFMT);
1482 zapid = osd_get_name_n_idx(env, osd, fid, buf);
1484 rc = -zap_add(osd->od_os, zapid, buf, 8, 1, zde, oh->ot_tx);
1488 /* Add new object to inode accounting.
1489 * Errors are not considered as fatal */
1490 rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid,
1491 (attr->la_valid & LA_UID) ? attr->la_uid : 0, 1,
1494 CERROR("%s: failed to add "DFID" to accounting ZAP for usr %d "
1495 "(%d)\n", osd->od_svname, PFID(fid), attr->la_uid, rc);
1496 rc = -zap_increment_int(osd->od_os, osd->od_igrp_oid,
1497 (attr->la_valid & LA_GID) ? attr->la_gid : 0, 1,
1500 CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d "
1501 "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
1503 /* configure new osd object */
1505 rc = osd_object_init0(env, obj);
1506 LASSERT(ergo(rc == 0, dt_object_exists(dt)));
1507 LASSERT(osd_invariant(obj));
1509 rc = osd_init_lma(env, obj, fid, oh);
1511 CERROR("%s: can not set LMA on "DFID": rc = %d\n",
1512 osd->od_svname, PFID(fid), rc);
1513 /* ignore errors during LMA initialization */
1522 static int osd_declare_object_ref_add(const struct lu_env *env,
1523 struct dt_object *dt,
1526 return osd_declare_attr_set(env, dt, NULL, th);
1530 * Concurrency: @dt is write locked.
1532 static int osd_object_ref_add(const struct lu_env *env,
1533 struct dt_object *dt,
1534 struct thandle *handle)
1536 struct osd_object *obj = osd_dt_obj(dt);
1537 struct osd_thandle *oh;
1538 struct osd_device *osd = osd_obj2dev(obj);
1544 LASSERT(osd_invariant(obj));
1545 LASSERT(dt_object_exists(dt));
1546 LASSERT(obj->oo_sa_hdl != NULL);
1548 oh = container_of0(handle, struct osd_thandle, ot_super);
1550 write_lock(&obj->oo_attr_lock);
1551 nlink = ++obj->oo_attr.la_nlink;
1552 write_unlock(&obj->oo_attr_lock);
1554 rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1558 static int osd_declare_object_ref_del(const struct lu_env *env,
1559 struct dt_object *dt,
1560 struct thandle *handle)
1562 return osd_declare_attr_set(env, dt, NULL, handle);
1566 * Concurrency: @dt is write locked.
1568 static int osd_object_ref_del(const struct lu_env *env,
1569 struct dt_object *dt,
1570 struct thandle *handle)
1572 struct osd_object *obj = osd_dt_obj(dt);
1573 struct osd_thandle *oh;
1574 struct osd_device *osd = osd_obj2dev(obj);
1580 LASSERT(osd_invariant(obj));
1581 LASSERT(dt_object_exists(dt));
1582 LASSERT(obj->oo_sa_hdl != NULL);
1584 oh = container_of0(handle, struct osd_thandle, ot_super);
1585 LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
1587 write_lock(&obj->oo_attr_lock);
1588 nlink = --obj->oo_attr.la_nlink;
1589 write_unlock(&obj->oo_attr_lock);
1591 rc = osd_object_sa_update(obj, SA_ZPL_LINKS(osd), &nlink, 8, oh);
1595 static int capa_is_sane(const struct lu_env *env, struct osd_device *dev,
1596 struct lustre_capa *capa, struct lustre_capa_key *keys)
1598 struct osd_thread_info *oti = osd_oti_get(env);
1599 struct obd_capa *oc;
1603 oc = capa_lookup(dev->od_capa_hash, capa, 0);
1605 if (capa_is_expired(oc)) {
1606 DEBUG_CAPA(D_ERROR, capa, "expired");
1613 spin_lock(&capa_lock);
1614 for (i = 0; i < 2; i++) {
1615 if (keys[i].lk_keyid == capa->lc_keyid) {
1616 oti->oti_capa_key = keys[i];
1620 spin_unlock(&capa_lock);
1623 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1627 rc = capa_hmac(oti->oti_capa.lc_hmac, capa, oti->oti_capa_key.lk_key);
1630 if (memcmp(oti->oti_capa.lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac)))
1632 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1636 oc = capa_add(dev->od_capa_hash, capa);
1642 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1643 struct lustre_capa *capa, __u64 opc)
1645 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1646 struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1649 if (!dev->od_fl_capa)
1652 if (capa == BYPASS_CAPA)
1656 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1660 if (!lu_fid_eq(fid, &capa->lc_fid)) {
1661 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",PFID(fid));
1665 if (!capa_opc_supported(capa, opc)) {
1666 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1670 if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1671 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1678 static struct obd_capa *osd_capa_get(const struct lu_env *env,
1679 struct dt_object *dt,
1680 struct lustre_capa *old,
1683 struct osd_thread_info *info = osd_oti_get(env);
1684 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1685 struct osd_object *obj = osd_dt_obj(dt);
1686 struct osd_device *dev = osd_obj2dev(obj);
1687 struct lustre_capa_key *key = &info->oti_capa_key;
1688 struct lustre_capa *capa = &info->oti_capa;
1689 struct obd_capa *oc;
1693 if (!dev->od_fl_capa)
1694 RETURN(ERR_PTR(-ENOENT));
1696 LASSERT(dt_object_exists(dt));
1697 LASSERT(osd_invariant(obj));
1699 /* renewal sanity check */
1700 if (old && osd_object_auth(env, dt, old, opc))
1701 RETURN(ERR_PTR(-EACCES));
1703 capa->lc_fid = *fid;
1706 capa->lc_flags = dev->od_capa_alg << 24;
1707 capa->lc_timeout = dev->od_capa_timeout;
1708 capa->lc_expiry = 0;
1710 oc = capa_lookup(dev->od_capa_hash, capa, 1);
1712 LASSERT(!capa_is_expired(oc));
1716 spin_lock(&capa_lock);
1717 *key = dev->od_capa_keys[1];
1718 spin_unlock(&capa_lock);
1720 capa->lc_keyid = key->lk_keyid;
1721 capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
1723 rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
1725 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
1727 RETURN(ERR_PTR(rc));
1730 oc = capa_add(dev->od_capa_hash, capa);
1734 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
1735 __u64 start, __u64 end)
1737 struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
1740 /* XXX: no other option than syncing the whole filesystem until we
1741 * support ZIL. If the object tracked the txg that it was last
1742 * modified in, it could pass that txg here instead of "0". Maybe
1743 * the changes are already committed, so no wait is needed at all? */
1744 txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
1749 static struct dt_object_operations osd_obj_ops = {
1750 .do_read_lock = osd_object_read_lock,
1751 .do_write_lock = osd_object_write_lock,
1752 .do_read_unlock = osd_object_read_unlock,
1753 .do_write_unlock = osd_object_write_unlock,
1754 .do_write_locked = osd_object_write_locked,
1755 .do_attr_get = osd_attr_get,
1756 .do_declare_attr_set = osd_declare_attr_set,
1757 .do_attr_set = osd_attr_set,
1758 .do_ah_init = osd_ah_init,
1759 .do_declare_create = osd_declare_object_create,
1760 .do_create = osd_object_create,
1761 .do_declare_destroy = osd_declare_object_destroy,
1762 .do_destroy = osd_object_destroy,
1763 .do_index_try = osd_index_try,
1764 .do_declare_ref_add = osd_declare_object_ref_add,
1765 .do_ref_add = osd_object_ref_add,
1766 .do_declare_ref_del = osd_declare_object_ref_del,
1767 .do_ref_del = osd_object_ref_del,
1768 .do_xattr_get = osd_xattr_get,
1769 .do_declare_xattr_set = osd_declare_xattr_set,
1770 .do_xattr_set = osd_xattr_set,
1771 .do_declare_xattr_del = osd_declare_xattr_del,
1772 .do_xattr_del = osd_xattr_del,
1773 .do_xattr_list = osd_xattr_list,
1774 .do_capa_get = osd_capa_get,
1775 .do_object_sync = osd_object_sync,
1778 static struct lu_object_operations osd_lu_obj_ops = {
1779 .loo_object_init = osd_object_init,
1780 .loo_object_delete = osd_object_delete,
1781 .loo_object_release = osd_object_release,
1782 .loo_object_free = osd_object_free,
1783 .loo_object_print = osd_object_print,
1784 .loo_object_invariant = osd_object_invariant,
1787 static int osd_otable_it_attr_get(const struct lu_env *env,
1788 struct dt_object *dt,
1789 struct lu_attr *attr,
1790 struct lustre_capa *capa)
1796 static struct dt_object_operations osd_obj_otable_it_ops = {
1797 .do_attr_get = osd_otable_it_attr_get,
1798 .do_index_try = osd_index_try,