4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/osd/osd_handler.c
38 * Top-level entry points into osd module
40 * Author: Nikita Danilov <nikita@clusterfs.com>
41 * Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
44 #define DEBUG_SUBSYSTEM S_MDS
46 #include <linux/module.h>
48 /* LUSTRE_VERSION_CODE */
49 #include <lustre_ver.h>
50 /* prerequisite for linux/xattr.h */
51 #include <linux/types.h>
52 /* prerequisite for linux/xattr.h */
54 /* XATTR_{REPLACE,CREATE} */
55 #include <linux/xattr.h>
60 * struct OBD_{ALLOC,FREE}*()
63 #include <obd_support.h>
64 /* struct ptlrpc_thread */
65 #include <lustre_net.h>
66 #include <lustre_fid.h>
68 #include "osd_internal.h"
70 /* llo_* api support */
71 #include <md_object.h>
72 #include <lustre_quota.h>
75 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
76 "ldiskfs with parallel directory operations");
78 static const char dot[] = ".";
79 static const char dotdot[] = "..";
80 static const char remote_obj_dir[] = "REM_OBJ_DIR";
82 static const struct lu_object_operations osd_lu_obj_ops;
83 static const struct dt_object_operations osd_obj_ops;
84 static const struct dt_object_operations osd_obj_ea_ops;
85 static const struct dt_object_operations osd_obj_otable_it_ops;
86 static const struct dt_index_operations osd_index_iam_ops;
87 static const struct dt_index_operations osd_index_ea_ops;
89 #ifdef OSD_TRACK_DECLARES
90 int osd_trans_declare_op2rb[] = {
91 [OSD_OT_ATTR_SET] = OSD_OT_ATTR_SET,
92 [OSD_OT_PUNCH] = OSD_OT_MAX,
93 [OSD_OT_XATTR_SET] = OSD_OT_XATTR_SET,
94 [OSD_OT_CREATE] = OSD_OT_DESTROY,
95 [OSD_OT_DESTROY] = OSD_OT_CREATE,
96 [OSD_OT_REF_ADD] = OSD_OT_REF_DEL,
97 [OSD_OT_REF_DEL] = OSD_OT_REF_ADD,
98 [OSD_OT_WRITE] = OSD_OT_WRITE,
99 [OSD_OT_INSERT] = OSD_OT_DELETE,
100 [OSD_OT_DELETE] = OSD_OT_INSERT,
101 [OSD_OT_QUOTA] = OSD_OT_MAX,
105 static int osd_has_index(const struct osd_object *obj)
107 return obj->oo_dt.do_index_ops != NULL;
110 static int osd_object_invariant(const struct lu_object *l)
112 return osd_invariant(osd_obj(l));
116 * Concurrency: doesn't matter
118 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
120 return osd_oti_get(env)->oti_r_locks > 0;
124 * Concurrency: doesn't matter
126 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
128 struct osd_thread_info *oti = osd_oti_get(env);
129 return oti->oti_w_locks > 0 && o->oo_owner == env;
133 * Concurrency: doesn't access mutable data
135 static int osd_root_get(const struct lu_env *env,
136 struct dt_device *dev, struct lu_fid *f)
138 lu_local_obj_fid(f, OSD_FS_ROOT_OID);
143 * OSD object methods.
147 * Concurrency: no concurrent access is possible that early in object
150 static struct lu_object *osd_object_alloc(const struct lu_env *env,
151 const struct lu_object_header *hdr,
154 struct osd_object *mo;
160 l = &mo->oo_dt.do_lu;
161 dt_object_init(&mo->oo_dt, NULL, d);
162 mo->oo_dt.do_ops = &osd_obj_ea_ops;
163 l->lo_ops = &osd_lu_obj_ops;
164 init_rwsem(&mo->oo_sem);
165 init_rwsem(&mo->oo_ext_idx_sem);
166 spin_lock_init(&mo->oo_guard);
173 static inline int __osd_xattr_get(struct inode *inode, struct dentry *dentry,
174 const char *name, void *buf, int len)
176 dentry->d_inode = inode;
177 return inode->i_op->getxattr(dentry, name, buf, len);
180 int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
181 struct dentry *dentry, struct lustre_mdt_attrs *lma)
185 rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA, (void *)lma,
188 /* try with old lma size */
189 rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA,
190 info->oti_mdt_attrs_old,
193 memcpy(lma, info->oti_mdt_attrs_old, sizeof(*lma));
196 /* Check LMA compatibility */
197 if (lma->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP)) {
198 CWARN("%.16s: unsupported incompat LMA feature(s) "
200 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
201 inode->i_ino, le32_to_cpu(lma->lma_incompat) &
205 lustre_lma_swab(lma);
208 } else if (rc == 0) {
216 * retrieve object from backend ext fs.
218 struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
219 struct osd_inode_id *id)
221 struct inode *inode = NULL;
223 inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
225 CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n",
226 id->oii_ino, PTR_ERR(inode));
227 } else if (id->oii_gen != OSD_OII_NOGEN &&
228 inode->i_generation != id->oii_gen) {
229 CDEBUG(D_INODE, "unmatched inode: ino = %u, gen0 = %u, "
231 id->oii_ino, id->oii_gen, inode->i_generation);
233 inode = ERR_PTR(-ESTALE);
234 } else if (inode->i_nlink == 0) {
235 /* due to parallel readdir and unlink,
236 * we can have dead inode here. */
237 CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
238 make_bad_inode(inode);
240 inode = ERR_PTR(-ESTALE);
241 } else if (is_bad_inode(inode)) {
242 CWARN("%.16s: bad inode: ino = %u\n",
243 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, id->oii_ino);
245 inode = ERR_PTR(-ENOENT);
247 if (id->oii_gen == OSD_OII_NOGEN)
248 osd_id_gen(id, inode->i_ino, inode->i_generation);
250 /* Do not update file c/mtime in ldiskfs.
251 * NB: we don't have any lock to protect this because we don't
252 * have reference on osd_object now, but contention with
253 * another lookup + attr_set can't happen in the tiny window
254 * between if (...) and set S_NOCMTIME. */
255 if (!(inode->i_flags & S_NOCMTIME))
256 inode->i_flags |= S_NOCMTIME;
261 static struct inode *
262 osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
263 struct osd_inode_id *id, struct lu_fid *fid)
265 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
269 inode = osd_iget(info, dev, id);
273 rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
275 *fid = lma->lma_self_fid;
276 } else if (rc == -ENODATA) {
277 if (unlikely(inode == osd_sb(dev)->s_root->d_inode))
278 lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
280 lu_igif_build(fid, inode->i_ino, inode->i_generation);
288 static struct inode *
289 osd_iget_verify(struct osd_thread_info *info, struct osd_device *dev,
290 struct osd_inode_id *id, const struct lu_fid *fid)
292 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
296 inode = osd_iget(info, dev, id);
300 rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
309 if (!lu_fid_eq(fid, &lma->lma_self_fid)) {
310 CDEBUG(D_LFSCK, "inconsistent obj: "DFID", %lu, "DFID"\n",
311 PFID(&lma->lma_self_fid), inode->i_ino, PFID(fid));
313 return ERR_PTR(-EREMCHG);
319 static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
320 const struct lu_fid *fid,
321 const struct lu_object_conf *conf)
323 struct osd_thread_info *info;
324 struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
325 struct osd_device *dev;
326 struct osd_idmap_cache *oic;
327 struct osd_inode_id *id;
329 struct osd_scrub *scrub;
330 struct scrub_file *sf;
335 LINVRNT(osd_invariant(obj));
336 LASSERT(obj->oo_inode == NULL);
337 LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
340 scrub = &dev->od_scrub;
341 sf = &scrub->os_file;
342 info = osd_oti_get(env);
344 oic = &info->oti_cache;
346 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
349 /* Search order: 1. per-thread cache. */
350 if (lu_fid_eq(fid, &oic->oic_fid)) {
356 if (!cfs_list_empty(&scrub->os_inconsistent_items)) {
357 /* Search order: 2. OI scrub pending list. */
358 result = osd_oii_lookup(dev, fid, id);
363 if (sf->sf_flags & SF_INCONSISTENT)
367 * Objects are created as locking anchors or place holders for objects
368 * yet to be created. No need to osd_oi_lookup() at here because FID
369 * shouldn't never be re-used, if it's really a duplicate FID from
370 * unexpected reason, we should be able to detect it later by calling
371 * do_create->osd_oi_insert()
373 if (conf != NULL && conf->loc_flags & LOC_F_NEW)
374 GOTO(out, result = 0);
376 /* Search order: 3. OI files. */
377 result = osd_oi_lookup(info, dev, fid, id, true);
378 if (result == -ENOENT) {
379 if (!fid_is_norm(fid) || fid_is_on_ost(info, dev, fid) ||
380 !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
382 GOTO(out, result = 0);
392 inode = osd_iget(info, dev, id);
394 inode = osd_iget_verify(info, dev, id, fid);
396 result = PTR_ERR(inode);
397 if (result == -ENOENT || result == -ESTALE) {
398 fid_zero(&oic->oic_fid);
400 } else if (result == -EREMCHG) {
403 if (thread_is_running(&scrub->os_thread)) {
404 result = -EINPROGRESS;
405 } else if (!dev->od_noscrub) {
406 result = osd_scrub_start(dev);
407 LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC "
408 "for "DFID", rc = %d [1]\n",
409 LDISKFS_SB(osd_sb(dev))->s_es->\
410 s_volume_name,PFID(fid), result);
411 if (result == 0 || result == -EALREADY)
412 result = -EINPROGRESS;
421 obj->oo_inode = inode;
422 LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
424 obj->oo_compat_dot_created = 1;
425 obj->oo_compat_dotdot_created = 1;
427 if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
428 GOTO(out, result = 0);
430 LASSERT(obj->oo_hl_head == NULL);
431 obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
432 if (obj->oo_hl_head == NULL) {
433 obj->oo_inode = NULL;
435 GOTO(out, result = -ENOMEM);
437 GOTO(out, result = 0);
440 LINVRNT(osd_invariant(obj));
445 * Concurrency: shouldn't matter.
447 static void osd_object_init0(struct osd_object *obj)
449 LASSERT(obj->oo_inode != NULL);
450 obj->oo_dt.do_body_ops = &osd_body_ops;
451 obj->oo_dt.do_lu.lo_header->loh_attr |=
452 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
456 * Concurrency: no concurrent access is possible that early in object
459 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
460 const struct lu_object_conf *conf)
462 struct osd_object *obj = osd_obj(l);
465 LINVRNT(osd_invariant(obj));
467 if (fid_is_otable_it(&l->lo_header->loh_fid)) {
468 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
469 l->lo_header->loh_attr |= LOHA_EXISTS;
473 result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
474 obj->oo_dt.do_body_ops = &osd_body_ops_new;
475 if (result == 0 && obj->oo_inode != NULL)
476 osd_object_init0(obj);
478 LINVRNT(osd_invariant(obj));
483 * Concurrency: no concurrent access is possible that late in object
486 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
488 struct osd_object *obj = osd_obj(l);
490 LINVRNT(osd_invariant(obj));
492 dt_object_fini(&obj->oo_dt);
493 if (obj->oo_hl_head != NULL)
494 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
499 * Concurrency: no concurrent access is possible that late in object
502 static void osd_index_fini(struct osd_object *o)
504 struct iam_container *bag;
506 if (o->oo_dir != NULL) {
507 bag = &o->oo_dir->od_container;
508 if (o->oo_inode != NULL) {
509 if (bag->ic_object == o->oo_inode)
510 iam_container_fini(bag);
512 OBD_FREE_PTR(o->oo_dir);
518 * Concurrency: no concurrent access is possible that late in object
519 * life-cycle (for all existing callers, that is. New callers have to provide
520 * their own locking.)
522 static int osd_inode_unlinked(const struct inode *inode)
524 return inode->i_nlink == 0;
528 OSD_TXN_OI_DELETE_CREDITS = 20,
529 OSD_TXN_INODE_DELETE_CREDITS = 20
536 #if OSD_THANDLE_STATS
538 * Set time when the handle is allocated
540 static void osd_th_alloced(struct osd_thandle *oth)
542 oth->oth_alloced = cfs_time_current();
546 * Set time when the handle started
548 static void osd_th_started(struct osd_thandle *oth)
550 oth->oth_started = cfs_time_current();
554 * Helper function to convert time interval to microseconds packed in
557 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
561 cfs_duration_usec(cfs_time_sub(end, start), &val);
562 return val.tv_sec * 1000000 + val.tv_usec;
566 * Check whether the we deal with this handle for too long.
568 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
569 cfs_time_t alloced, cfs_time_t started,
572 cfs_time_t now = cfs_time_current();
574 LASSERT(dev != NULL);
576 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
577 interval_to_usec(alloced, started));
578 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
579 interval_to_usec(started, closed));
580 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
581 interval_to_usec(closed, now));
583 if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
584 CWARN("transaction handle %p was open for too long: "
586 "alloced "CFS_TIME_T" ,"
587 "started "CFS_TIME_T" ,"
588 "closed "CFS_TIME_T"\n",
589 oth, now, alloced, started, closed);
590 libcfs_debug_dumpstack(NULL);
594 #define OSD_CHECK_SLOW_TH(oth, dev, expr) \
596 cfs_time_t __closed = cfs_time_current(); \
597 cfs_time_t __alloced = oth->oth_alloced; \
598 cfs_time_t __started = oth->oth_started; \
601 __osd_th_check_slow(oth, dev, __alloced, __started, __closed); \
604 #else /* OSD_THANDLE_STATS */
606 #define osd_th_alloced(h) do {} while(0)
607 #define osd_th_started(h) do {} while(0)
608 #define OSD_CHECK_SLOW_TH(oth, dev, expr) expr
610 #endif /* OSD_THANDLE_STATS */
613 * Concurrency: doesn't access mutable data.
615 static int osd_param_is_not_sane(const struct osd_device *dev,
616 const struct thandle *th)
618 struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
620 return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers;
624 * Concurrency: shouldn't matter.
626 static void osd_trans_commit_cb(struct super_block *sb,
627 struct ldiskfs_journal_cb_entry *jcb, int error)
629 struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
630 struct thandle *th = &oh->ot_super;
631 struct lu_device *lud = &th->th_dev->dd_lu_dev;
632 struct dt_txn_commit_cb *dcb, *tmp;
634 LASSERT(oh->ot_handle == NULL);
637 CERROR("transaction @0x%p commit error: %d\n", th, error);
639 dt_txn_hook_commit(th);
641 /* call per-transaction callbacks if any */
642 cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
643 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
644 "commit callback entry: magic=%x name='%s'\n",
645 dcb->dcb_magic, dcb->dcb_name);
646 cfs_list_del_init(&dcb->dcb_linkage);
647 dcb->dcb_func(NULL, th, dcb, error);
650 lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
654 lu_context_exit(&th->th_ctx);
655 lu_context_fini(&th->th_ctx);
659 static struct thandle *osd_trans_create(const struct lu_env *env,
662 struct osd_thread_info *oti = osd_oti_get(env);
663 struct osd_iobuf *iobuf = &oti->oti_iobuf;
664 struct osd_thandle *oh;
668 /* on pending IO in this thread should left from prev. request */
669 LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
671 th = ERR_PTR(-ENOMEM);
672 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
674 oh->ot_quota_trans = &oti->oti_quota_trans;
675 memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans));
679 th->th_tags = LCT_TX_HANDLE;
681 oti->oti_dev = osd_dt_dev(d);
682 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
685 memset(oti->oti_declare_ops, 0, OSD_OT_MAX);
686 memset(oti->oti_declare_ops_rb, 0, OSD_OT_MAX);
687 memset(oti->oti_declare_ops_cred, 0, OSD_OT_MAX);
688 oti->oti_rollback = false;
694 * Concurrency: shouldn't matter.
696 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
699 struct osd_thread_info *oti = osd_oti_get(env);
700 struct osd_device *dev = osd_dt_dev(d);
702 struct osd_thandle *oh;
707 LASSERT(current->journal_info == NULL);
709 oh = container_of0(th, struct osd_thandle, ot_super);
711 LASSERT(oh->ot_handle == NULL);
713 rc = dt_txn_hook_start(env, d, th);
717 if (unlikely(osd_param_is_not_sane(dev, th))) {
718 static unsigned long last_printed;
719 static int last_credits;
721 CWARN("%.16s: too many transaction credits (%d > %d)\n",
722 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
724 osd_journal(dev)->j_max_transaction_buffers);
725 #ifdef OSD_TRACK_DECLARES
726 CWARN(" create: %u/%u, delete: %u/%u, destroy: %u/%u\n",
727 oti->oti_declare_ops[OSD_OT_CREATE],
728 oti->oti_declare_ops_cred[OSD_OT_CREATE],
729 oti->oti_declare_ops[OSD_OT_DELETE],
730 oti->oti_declare_ops_cred[OSD_OT_DELETE],
731 oti->oti_declare_ops[OSD_OT_DESTROY],
732 oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
733 CWARN(" attr_set: %u/%u, xattr_set: %u/%u\n",
734 oti->oti_declare_ops[OSD_OT_ATTR_SET],
735 oti->oti_declare_ops_cred[OSD_OT_ATTR_SET],
736 oti->oti_declare_ops[OSD_OT_XATTR_SET],
737 oti->oti_declare_ops_cred[OSD_OT_XATTR_SET]);
738 CWARN(" write: %u/%u, punch: %u/%u, quota %u/%u\n",
739 oti->oti_declare_ops[OSD_OT_WRITE],
740 oti->oti_declare_ops_cred[OSD_OT_WRITE],
741 oti->oti_declare_ops[OSD_OT_PUNCH],
742 oti->oti_declare_ops_cred[OSD_OT_PUNCH],
743 oti->oti_declare_ops[OSD_OT_QUOTA],
744 oti->oti_declare_ops_cred[OSD_OT_QUOTA]);
745 CWARN(" insert: %u/%u, delete: %u/%u\n",
746 oti->oti_declare_ops[OSD_OT_INSERT],
747 oti->oti_declare_ops_cred[OSD_OT_INSERT],
748 oti->oti_declare_ops[OSD_OT_DESTROY],
749 oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
750 CWARN(" ref_add: %u/%u, ref_del: %u/%u\n",
751 oti->oti_declare_ops[OSD_OT_REF_ADD],
752 oti->oti_declare_ops_cred[OSD_OT_REF_ADD],
753 oti->oti_declare_ops[OSD_OT_REF_DEL],
754 oti->oti_declare_ops_cred[OSD_OT_REF_DEL]);
756 if (last_credits != oh->ot_credits &&
757 time_after(jiffies, last_printed + 60 * HZ)) {
758 libcfs_debug_dumpstack(NULL);
759 last_credits = oh->ot_credits;
760 last_printed = jiffies;
763 /* XXX Limit the credits to 'max_transaction_buffers', and
764 * let the underlying filesystem to catch the error if
765 * we really need so many credits.
767 * This should be removed when we can calculate the
768 * credits precisely. */
769 oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
773 * XXX temporary stuff. Some abstraction layer should
776 jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
780 LASSERT(oti->oti_txns == 0);
781 lu_context_init(&th->th_ctx, th->th_tags);
782 lu_context_enter(&th->th_ctx);
784 lu_device_get(&d->dd_lu_dev);
785 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
797 * Concurrency: shouldn't matter.
799 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
802 struct osd_thandle *oh;
803 struct osd_thread_info *oti = osd_oti_get(env);
804 struct osd_iobuf *iobuf = &oti->oti_iobuf;
805 struct qsd_instance *qsd = oti->oti_dev->od_quota_slave;
808 oh = container_of0(th, struct osd_thandle, ot_super);
811 /* inform the quota slave device that the transaction is
813 qsd_op_end(env, qsd, oh->ot_quota_trans);
814 oh->ot_quota_trans = NULL;
816 if (oh->ot_handle != NULL) {
817 handle_t *hdl = oh->ot_handle;
820 * add commit callback
821 * notice we don't do this in osd_trans_start()
822 * as underlying transaction can change during truncate
824 ldiskfs_journal_callback_add(hdl, osd_trans_commit_cb,
827 LASSERT(oti->oti_txns == 1);
829 rc = dt_txn_hook_stop(env, th);
831 CERROR("Failure in transaction hook: %d\n", rc);
833 /* hook functions might modify th_sync */
834 hdl->h_sync = th->th_sync;
836 oh->ot_handle = NULL;
837 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
838 rc = ldiskfs_journal_stop(hdl));
840 CERROR("Failure to stop transaction: %d\n", rc);
845 /* as we want IO to journal and data IO be concurrent, we don't block
846 * awaiting data IO completion in osd_do_bio(), instead we wait here
847 * once transaction is submitted to the journal. all reqular requests
848 * don't do direct IO (except read/write), thus this wait_event becomes
851 * IMPORTANT: we have to wait till any IO submited by the thread is
852 * completed otherwise iobuf may be corrupted by different request
854 cfs_wait_event(iobuf->dr_wait,
855 cfs_atomic_read(&iobuf->dr_numreqs) == 0);
857 rc = iobuf->dr_error;
862 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
864 struct osd_thandle *oh = container_of0(th, struct osd_thandle,
867 LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
868 LASSERT(&dcb->dcb_func != NULL);
869 cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
875 * Called just before object is freed. Releases all resources except for
876 * object itself (that is released by osd_object_free()).
878 * Concurrency: no concurrent access is possible that late in object
881 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
883 struct osd_object *obj = osd_obj(l);
884 struct inode *inode = obj->oo_inode;
886 LINVRNT(osd_invariant(obj));
889 * If object is unlinked remove fid->ino mapping from object index.
894 struct qsd_instance *qsd = osd_obj2dev(obj)->od_quota_slave;
895 qid_t uid = inode->i_uid;
896 qid_t gid = inode->i_gid;
899 obj->oo_inode = NULL;
902 struct osd_thread_info *info = osd_oti_get(env);
903 struct lquota_id_info *qi = &info->oti_qi;
905 /* Release granted quota to master if necessary */
906 qi->lqi_id.qid_uid = uid;
907 qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA);
909 qi->lqi_id.qid_uid = gid;
910 qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA);
916 * Concurrency: ->loo_object_release() is called under site spin-lock.
918 static void osd_object_release(const struct lu_env *env,
924 * Concurrency: shouldn't matter.
926 static int osd_object_print(const struct lu_env *env, void *cookie,
927 lu_printer_t p, const struct lu_object *l)
929 struct osd_object *o = osd_obj(l);
932 if (o->oo_dir != NULL)
933 d = o->oo_dir->od_container.ic_descr;
936 return (*p)(env, cookie,
937 LUSTRE_OSD_LDISKFS_NAME"-object@%p(i:%p:%lu/%u)[%s]",
939 o->oo_inode ? o->oo_inode->i_ino : 0UL,
940 o->oo_inode ? o->oo_inode->i_generation : 0,
941 d ? d->id_ops->id_name : "plain");
945 * Concurrency: shouldn't matter.
947 int osd_statfs(const struct lu_env *env, struct dt_device *d,
948 struct obd_statfs *sfs)
950 struct osd_device *osd = osd_dt_dev(d);
951 struct super_block *sb = osd_sb(osd);
952 struct kstatfs *ksfs;
955 if (unlikely(osd->od_mnt == NULL))
958 /* osd_lproc.c call this without env, allocate ksfs for that case */
959 if (unlikely(env == NULL)) {
964 ksfs = &osd_oti_get(env)->oti_ksfs;
967 spin_lock(&osd->od_osfs_lock);
969 if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
970 result = sb->s_op->statfs(sb->s_root, ksfs);
971 if (likely(result == 0)) { /* N.B. statfs can't really fail */
972 osd->od_osfs_age = cfs_time_current_64();
973 statfs_pack(&osd->od_statfs, ksfs);
974 if (sb->s_flags & MS_RDONLY)
975 sfs->os_state = OS_STATE_READONLY;
979 if (likely(result == 0))
980 *sfs = osd->od_statfs;
981 spin_unlock(&osd->od_osfs_lock);
983 if (unlikely(env == NULL))
990 * Estimate space needed for file creations. We assume the largest filename
991 * which is 2^64 - 1, hence a filename of 20 chars.
992 * This is 28 bytes per object which is 28MB for 1M objects ... no so bad.
994 #ifdef __LDISKFS_DIR_REC_LEN
995 #define PER_OBJ_USAGE __LDISKFS_DIR_REC_LEN(20)
997 #define PER_OBJ_USAGE LDISKFS_DIR_REC_LEN(20)
1001 * Concurrency: doesn't access mutable data.
1003 static void osd_conf_get(const struct lu_env *env,
1004 const struct dt_device *dev,
1005 struct dt_device_param *param)
1007 struct super_block *sb = osd_sb(osd_dt_dev(dev));
1010 * XXX should be taken from not-yet-existing fs abstraction layer.
1012 param->ddp_mnt = osd_dt_dev(dev)->od_mnt;
1013 param->ddp_max_name_len = LDISKFS_NAME_LEN;
1014 param->ddp_max_nlink = LDISKFS_LINK_MAX;
1015 param->ddp_block_shift = sb->s_blocksize_bits;
1016 param->ddp_mount_type = LDD_MT_LDISKFS;
1017 param->ddp_maxbytes = sb->s_maxbytes;
1018 /* Overhead estimate should be fairly accurate, so we really take a tiny
1019 * error margin which also avoids fragmenting the filesystem too much */
1020 param->ddp_grant_reserved = 2; /* end up to be 1.9% after conversion */
1021 /* inode are statically allocated, so per-inode space consumption
1022 * is the space consumed by the directory entry */
1023 param->ddp_inodespace = PER_OBJ_USAGE;
1024 /* per-fragment overhead to be used by the client code */
1025 param->ddp_grant_frag = 6 * LDISKFS_BLOCK_SIZE(sb);
1026 param->ddp_mntopts = 0;
1027 if (test_opt(sb, XATTR_USER))
1028 param->ddp_mntopts |= MNTOPT_USERXATTR;
1029 if (test_opt(sb, POSIX_ACL))
1030 param->ddp_mntopts |= MNTOPT_ACL;
1032 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
1033 if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
1034 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
1037 param->ddp_max_ea_size = sb->s_blocksize;
1042 * Concurrency: shouldn't matter.
1044 static int osd_sync(const struct lu_env *env, struct dt_device *d)
1046 CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1047 return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
1051 * Start commit for OSD device.
1053 * An implementation of dt_commit_async method for OSD device.
1054 * Asychronously starts underlayng fs sync and thereby a transaction
1057 * \param env environment
1058 * \param d dt device
1060 * \see dt_device_operations
1062 static int osd_commit_async(const struct lu_env *env,
1063 struct dt_device *d)
1065 struct super_block *s = osd_sb(osd_dt_dev(d));
1068 CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1069 RETURN(s->s_op->sync_fs(s, 0));
1073 * Concurrency: shouldn't matter.
1076 static int osd_ro(const struct lu_env *env, struct dt_device *d)
1078 struct super_block *sb = osd_sb(osd_dt_dev(d));
1082 CERROR("*** setting %s read-only ***\n", osd_dt_dev(d)->od_svname);
1084 rc = __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
1089 * Concurrency: serialization provided by callers.
1091 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
1092 int mode, unsigned long timeout, __u32 alg,
1093 struct lustre_capa_key *keys)
1095 struct osd_device *dev = osd_dt_dev(d);
1098 dev->od_fl_capa = mode;
1099 dev->od_capa_timeout = timeout;
1100 dev->od_capa_alg = alg;
1101 dev->od_capa_keys = keys;
1106 * Note: we do not count into QUOTA here.
1107 * If we mount with --data_journal we may need more.
1109 const int osd_dto_credits_noquota[DTO_NR] = {
1112 * INDEX_EXTRA_TRANS_BLOCKS(8) +
1113 * SINGLEDATA_TRANS_BLOCKS(8)
1114 * XXX Note: maybe iam need more, since iam have more level than
1117 [DTO_INDEX_INSERT] = 16,
1118 [DTO_INDEX_DELETE] = 16,
1122 [DTO_INDEX_UPDATE] = 16,
1124 * Create a object. The same as create object in EXT3.
1125 * DATA_TRANS_BLOCKS(14) +
1126 * INDEX_EXTRA_BLOCKS(8) +
1127 * 3(inode bits, groups, GDT)
1129 [DTO_OBJECT_CREATE] = 25,
1131 * XXX: real credits to be fixed
1133 [DTO_OBJECT_DELETE] = 25,
1135 * Attr set credits (inode)
1137 [DTO_ATTR_SET_BASE] = 1,
1139 * Xattr set. The same as xattr of EXT3.
1140 * DATA_TRANS_BLOCKS(14)
1141 * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1142 * are also counted in. Do not know why?
1144 [DTO_XATTR_SET] = 14,
1147 * credits for inode change during write.
1149 [DTO_WRITE_BASE] = 3,
1151 * credits for single block write.
1153 [DTO_WRITE_BLOCK] = 14,
1155 * Attr set credits for chown.
1156 * This is extra credits for setattr, and it is null without quota
1158 [DTO_ATTR_SET_CHOWN]= 0
1161 static const struct dt_device_operations osd_dt_ops = {
1162 .dt_root_get = osd_root_get,
1163 .dt_statfs = osd_statfs,
1164 .dt_trans_create = osd_trans_create,
1165 .dt_trans_start = osd_trans_start,
1166 .dt_trans_stop = osd_trans_stop,
1167 .dt_trans_cb_add = osd_trans_cb_add,
1168 .dt_conf_get = osd_conf_get,
1169 .dt_sync = osd_sync,
1171 .dt_commit_async = osd_commit_async,
1172 .dt_init_capa_ctxt = osd_init_capa_ctxt,
1175 static void osd_object_read_lock(const struct lu_env *env,
1176 struct dt_object *dt, unsigned role)
1178 struct osd_object *obj = osd_dt_obj(dt);
1179 struct osd_thread_info *oti = osd_oti_get(env);
1181 LINVRNT(osd_invariant(obj));
1183 LASSERT(obj->oo_owner != env);
1184 down_read_nested(&obj->oo_sem, role);
1186 LASSERT(obj->oo_owner == NULL);
1190 static void osd_object_write_lock(const struct lu_env *env,
1191 struct dt_object *dt, unsigned role)
1193 struct osd_object *obj = osd_dt_obj(dt);
1194 struct osd_thread_info *oti = osd_oti_get(env);
1196 LINVRNT(osd_invariant(obj));
1198 LASSERT(obj->oo_owner != env);
1199 down_write_nested(&obj->oo_sem, role);
1201 LASSERT(obj->oo_owner == NULL);
1202 obj->oo_owner = env;
1206 static void osd_object_read_unlock(const struct lu_env *env,
1207 struct dt_object *dt)
1209 struct osd_object *obj = osd_dt_obj(dt);
1210 struct osd_thread_info *oti = osd_oti_get(env);
1212 LINVRNT(osd_invariant(obj));
1214 LASSERT(oti->oti_r_locks > 0);
1216 up_read(&obj->oo_sem);
1219 static void osd_object_write_unlock(const struct lu_env *env,
1220 struct dt_object *dt)
1222 struct osd_object *obj = osd_dt_obj(dt);
1223 struct osd_thread_info *oti = osd_oti_get(env);
1225 LINVRNT(osd_invariant(obj));
1227 LASSERT(obj->oo_owner == env);
1228 LASSERT(oti->oti_w_locks > 0);
1230 obj->oo_owner = NULL;
1231 up_write(&obj->oo_sem);
1234 static int osd_object_write_locked(const struct lu_env *env,
1235 struct dt_object *dt)
1237 struct osd_object *obj = osd_dt_obj(dt);
1239 LINVRNT(osd_invariant(obj));
1241 return obj->oo_owner == env;
1244 static int capa_is_sane(const struct lu_env *env,
1245 struct osd_device *dev,
1246 struct lustre_capa *capa,
1247 struct lustre_capa_key *keys)
1249 struct osd_thread_info *oti = osd_oti_get(env);
1250 struct lustre_capa *tcapa = &oti->oti_capa;
1251 struct obd_capa *oc;
1255 oc = capa_lookup(dev->od_capa_hash, capa, 0);
1257 if (capa_is_expired(oc)) {
1258 DEBUG_CAPA(D_ERROR, capa, "expired");
1265 if (capa_is_expired_sec(capa)) {
1266 DEBUG_CAPA(D_ERROR, capa, "expired");
1270 spin_lock(&capa_lock);
1271 for (i = 0; i < 2; i++) {
1272 if (keys[i].lk_keyid == capa->lc_keyid) {
1273 oti->oti_capa_key = keys[i];
1277 spin_unlock(&capa_lock);
1280 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1284 rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1288 if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1289 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1293 oc = capa_add(dev->od_capa_hash, capa);
1299 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1300 struct lustre_capa *capa, __u64 opc)
1302 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1303 struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1304 struct md_capainfo *ci;
1307 if (!dev->od_fl_capa)
1310 if (capa == BYPASS_CAPA)
1313 ci = md_capainfo(env);
1317 if (ci->mc_auth == LC_ID_NONE)
1321 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1325 if (!lu_fid_eq(fid, &capa->lc_fid)) {
1326 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1331 if (!capa_opc_supported(capa, opc)) {
1332 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1336 if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1337 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1344 static struct timespec *osd_inode_time(const struct lu_env *env,
1345 struct inode *inode, __u64 seconds)
1347 struct osd_thread_info *oti = osd_oti_get(env);
1348 struct timespec *t = &oti->oti_time;
1350 t->tv_sec = seconds;
1352 *t = timespec_trunc(*t, inode->i_sb->s_time_gran);
1357 static void osd_inode_getattr(const struct lu_env *env,
1358 struct inode *inode, struct lu_attr *attr)
1360 attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1361 LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1362 LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE |
1365 attr->la_atime = LTIME_S(inode->i_atime);
1366 attr->la_mtime = LTIME_S(inode->i_mtime);
1367 attr->la_ctime = LTIME_S(inode->i_ctime);
1368 attr->la_mode = inode->i_mode;
1369 attr->la_size = i_size_read(inode);
1370 attr->la_blocks = inode->i_blocks;
1371 attr->la_uid = inode->i_uid;
1372 attr->la_gid = inode->i_gid;
1373 attr->la_flags = LDISKFS_I(inode)->i_flags;
1374 attr->la_nlink = inode->i_nlink;
1375 attr->la_rdev = inode->i_rdev;
1376 attr->la_blksize = 1 << inode->i_blkbits;
1377 attr->la_blkbits = inode->i_blkbits;
1380 static int osd_attr_get(const struct lu_env *env,
1381 struct dt_object *dt,
1382 struct lu_attr *attr,
1383 struct lustre_capa *capa)
1385 struct osd_object *obj = osd_dt_obj(dt);
1387 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
1388 LINVRNT(osd_invariant(obj));
1390 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1393 spin_lock(&obj->oo_guard);
1394 osd_inode_getattr(env, obj->oo_inode, attr);
1395 spin_unlock(&obj->oo_guard);
1399 static int osd_declare_attr_set(const struct lu_env *env,
1400 struct dt_object *dt,
1401 const struct lu_attr *attr,
1402 struct thandle *handle)
1404 struct osd_thandle *oh;
1405 struct osd_object *obj;
1406 struct osd_thread_info *info = osd_oti_get(env);
1407 struct lquota_id_info *qi = &info->oti_qi;
1413 LASSERT(dt != NULL);
1414 LASSERT(handle != NULL);
1416 obj = osd_dt_obj(dt);
1417 LASSERT(osd_invariant(obj));
1419 oh = container_of0(handle, struct osd_thandle, ot_super);
1420 LASSERT(oh->ot_handle == NULL);
1422 osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET,
1423 osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
1425 if (attr == NULL || obj->oo_inode == NULL)
1428 bspace = obj->oo_inode->i_blocks;
1429 bspace <<= obj->oo_inode->i_sb->s_blocksize_bits;
1430 bspace = toqb(bspace);
1432 /* Changing ownership is always preformed by super user, it should not
1435 * We still need to call the osd_declare_qid() to calculate the journal
1436 * credits for updating quota accounting files and to trigger quota
1437 * space adjustment once the operation is completed.*/
1438 if ((attr->la_valid & LA_UID) != 0 &&
1439 attr->la_uid != obj->oo_inode->i_uid) {
1440 qi->lqi_type = USRQUOTA;
1442 /* inode accounting */
1443 qi->lqi_is_blk = false;
1445 /* one more inode for the new owner ... */
1446 qi->lqi_id.qid_uid = attr->la_uid;
1448 allocated = (attr->la_uid == 0) ? true : false;
1449 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1450 if (rc == -EDQUOT || rc == -EINPROGRESS)
1455 /* and one less inode for the current uid */
1456 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1458 rc = osd_declare_qid(env, oh, qi, true, NULL);
1459 if (rc == -EDQUOT || rc == -EINPROGRESS)
1464 /* block accounting */
1465 qi->lqi_is_blk = true;
1467 /* more blocks for the new owner ... */
1468 qi->lqi_id.qid_uid = attr->la_uid;
1469 qi->lqi_space = bspace;
1470 allocated = (attr->la_uid == 0) ? true : false;
1471 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1472 if (rc == -EDQUOT || rc == -EINPROGRESS)
1477 /* and finally less blocks for the current owner */
1478 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1479 qi->lqi_space = -bspace;
1480 rc = osd_declare_qid(env, oh, qi, true, NULL);
1481 if (rc == -EDQUOT || rc == -EINPROGRESS)
1487 if (attr->la_valid & LA_GID &&
1488 attr->la_gid != obj->oo_inode->i_gid) {
1489 qi->lqi_type = GRPQUOTA;
1491 /* inode accounting */
1492 qi->lqi_is_blk = false;
1494 /* one more inode for the new group owner ... */
1495 qi->lqi_id.qid_gid = attr->la_gid;
1497 allocated = (attr->la_gid == 0) ? true : false;
1498 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1499 if (rc == -EDQUOT || rc == -EINPROGRESS)
1504 /* and one less inode for the current gid */
1505 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1507 rc = osd_declare_qid(env, oh, qi, true, NULL);
1508 if (rc == -EDQUOT || rc == -EINPROGRESS)
1513 /* block accounting */
1514 qi->lqi_is_blk = true;
1516 /* more blocks for the new owner ... */
1517 qi->lqi_id.qid_gid = attr->la_gid;
1518 qi->lqi_space = bspace;
1519 allocated = (attr->la_gid == 0) ? true : false;
1520 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1521 if (rc == -EDQUOT || rc == -EINPROGRESS)
1526 /* and finally less blocks for the current owner */
1527 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1528 qi->lqi_space = -bspace;
1529 rc = osd_declare_qid(env, oh, qi, true, NULL);
1530 if (rc == -EDQUOT || rc == -EINPROGRESS)
1539 static int osd_inode_setattr(const struct lu_env *env,
1540 struct inode *inode, const struct lu_attr *attr)
1544 bits = attr->la_valid;
1546 if (bits & LA_ATIME)
1547 inode->i_atime = *osd_inode_time(env, inode, attr->la_atime);
1548 if (bits & LA_CTIME)
1549 inode->i_ctime = *osd_inode_time(env, inode, attr->la_ctime);
1550 if (bits & LA_MTIME)
1551 inode->i_mtime = *osd_inode_time(env, inode, attr->la_mtime);
1552 if (bits & LA_SIZE) {
1553 LDISKFS_I(inode)->i_disksize = attr->la_size;
1554 i_size_write(inode, attr->la_size);
1558 /* OSD should not change "i_blocks" which is used by quota.
1559 * "i_blocks" should be changed by ldiskfs only. */
1560 if (bits & LA_BLOCKS)
1561 inode->i_blocks = attr->la_blocks;
1564 inode->i_mode = (inode->i_mode & S_IFMT) |
1565 (attr->la_mode & ~S_IFMT);
1567 inode->i_uid = attr->la_uid;
1569 inode->i_gid = attr->la_gid;
1570 if (bits & LA_NLINK)
1571 set_nlink(inode, attr->la_nlink);
1573 inode->i_rdev = attr->la_rdev;
1575 if (bits & LA_FLAGS) {
1576 /* always keep S_NOCMTIME */
1577 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1583 static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
1585 if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
1586 (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
1591 if (attr->la_valid & LA_UID)
1592 iattr.ia_valid |= ATTR_UID;
1593 if (attr->la_valid & LA_GID)
1594 iattr.ia_valid |= ATTR_GID;
1595 iattr.ia_uid = attr->la_uid;
1596 iattr.ia_gid = attr->la_gid;
1598 rc = ll_vfs_dq_transfer(inode, &iattr);
1600 CERROR("%s: quota transfer failed: rc = %d. Is quota "
1601 "enforcement enabled on the ldiskfs filesystem?",
1602 inode->i_sb->s_id, rc);
1609 static int osd_attr_set(const struct lu_env *env,
1610 struct dt_object *dt,
1611 const struct lu_attr *attr,
1612 struct thandle *handle,
1613 struct lustre_capa *capa)
1615 struct osd_object *obj = osd_dt_obj(dt);
1616 struct inode *inode;
1619 LASSERT(handle != NULL);
1620 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
1621 LASSERT(osd_invariant(obj));
1623 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1626 osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET);
1628 if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FID_MAPPING)) {
1629 struct osd_thread_info *oti = osd_oti_get(env);
1630 const struct lu_fid *fid0 = lu_object_fid(&dt->do_lu);
1631 struct lu_fid *fid1 = &oti->oti_fid;
1632 struct osd_inode_id *id = &oti->oti_id;
1633 struct iam_path_descr *ipd;
1634 struct iam_container *bag;
1635 struct osd_thandle *oh;
1638 fid_cpu_to_be(fid1, fid0);
1639 memset(id, 1, sizeof(*id));
1640 bag = &osd_fid2oi(osd_dev(dt->do_lu.lo_dev),
1641 fid0)->oi_dir.od_container;
1642 ipd = osd_idx_ipd_get(env, bag);
1643 if (unlikely(ipd == NULL))
1646 oh = container_of0(handle, struct osd_thandle, ot_super);
1647 rc = iam_update(oh->ot_handle, bag, (const struct iam_key *)fid1,
1648 (const struct iam_rec *)id, ipd);
1649 osd_ipd_put(env, bag, ipd);
1650 return(rc > 0 ? 0 : rc);
1653 inode = obj->oo_inode;
1654 ll_vfs_dq_init(inode);
1656 rc = osd_quota_transfer(inode, attr);
1660 spin_lock(&obj->oo_guard);
1661 rc = osd_inode_setattr(env, inode, attr);
1662 spin_unlock(&obj->oo_guard);
1665 inode->i_sb->s_op->dirty_inode(inode);
1669 struct dentry *osd_child_dentry_get(const struct lu_env *env,
1670 struct osd_object *obj,
1671 const char *name, const int namelen)
1673 return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
1676 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1678 struct dt_allocation_hint *hint,
1682 struct osd_device *osd = osd_obj2dev(obj);
1683 struct osd_thandle *oth;
1684 struct dt_object *parent = NULL;
1685 struct inode *inode;
1687 LINVRNT(osd_invariant(obj));
1688 LASSERT(obj->oo_inode == NULL);
1689 LASSERT(obj->oo_hl_head == NULL);
1691 if (S_ISDIR(mode) && ldiskfs_pdo) {
1692 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1693 if (obj->oo_hl_head == NULL)
1697 oth = container_of(th, struct osd_thandle, ot_super);
1698 LASSERT(oth->ot_handle->h_transaction != NULL);
1700 if (hint && hint->dah_parent)
1701 parent = hint->dah_parent;
1703 inode = ldiskfs_create_inode(oth->ot_handle,
1704 parent ? osd_dt_obj(parent)->oo_inode :
1705 osd_sb(osd)->s_root->d_inode,
1707 if (!IS_ERR(inode)) {
1708 /* Do not update file c/mtime in ldiskfs.
1709 * NB: don't need any lock because no contention at this
1711 inode->i_flags |= S_NOCMTIME;
1713 /* For new created object, it must be consistent,
1714 * and it is unnecessary to scrub against it. */
1715 ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB);
1716 obj->oo_inode = inode;
1719 if (obj->oo_hl_head != NULL) {
1720 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1721 obj->oo_hl_head = NULL;
1723 result = PTR_ERR(inode);
1725 LINVRNT(osd_invariant(obj));
1733 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1734 struct lu_attr *attr,
1735 struct dt_allocation_hint *hint,
1736 struct dt_object_format *dof,
1740 struct osd_thandle *oth;
1741 __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1743 LASSERT(S_ISDIR(attr->la_mode));
1745 oth = container_of(th, struct osd_thandle, ot_super);
1746 LASSERT(oth->ot_handle->h_transaction != NULL);
1747 result = osd_mkfile(info, obj, mode, hint, th);
1752 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1753 struct lu_attr *attr,
1754 struct dt_allocation_hint *hint,
1755 struct dt_object_format *dof,
1759 struct osd_thandle *oth;
1760 const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1762 __u32 mode = (attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX));
1764 LASSERT(S_ISREG(attr->la_mode));
1766 oth = container_of(th, struct osd_thandle, ot_super);
1767 LASSERT(oth->ot_handle->h_transaction != NULL);
1769 result = osd_mkfile(info, obj, mode, hint, th);
1771 LASSERT(obj->oo_inode != NULL);
1772 if (feat->dif_flags & DT_IND_VARKEY)
1773 result = iam_lvar_create(obj->oo_inode,
1774 feat->dif_keysize_max,
1776 feat->dif_recsize_max,
1779 result = iam_lfix_create(obj->oo_inode,
1780 feat->dif_keysize_max,
1782 feat->dif_recsize_max,
1789 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1790 struct lu_attr *attr,
1791 struct dt_allocation_hint *hint,
1792 struct dt_object_format *dof,
1795 LASSERT(S_ISREG(attr->la_mode));
1796 return osd_mkfile(info, obj, (attr->la_mode &
1797 (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
1800 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1801 struct lu_attr *attr,
1802 struct dt_allocation_hint *hint,
1803 struct dt_object_format *dof,
1806 LASSERT(S_ISLNK(attr->la_mode));
1807 return osd_mkfile(info, obj, (attr->la_mode &
1808 (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
1811 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1812 struct lu_attr *attr,
1813 struct dt_allocation_hint *hint,
1814 struct dt_object_format *dof,
1817 cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX);
1820 LINVRNT(osd_invariant(obj));
1821 LASSERT(obj->oo_inode == NULL);
1822 LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1823 S_ISFIFO(mode) || S_ISSOCK(mode));
1825 result = osd_mkfile(info, obj, mode, hint, th);
1827 LASSERT(obj->oo_inode != NULL);
1829 * This inode should be marked dirty for i_rdev. Currently
1830 * that is done in the osd_attr_init().
1832 init_special_inode(obj->oo_inode, obj->oo_inode->i_mode,
1835 LINVRNT(osd_invariant(obj));
1839 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1841 struct dt_allocation_hint *hint,
1842 struct dt_object_format *dof,
1845 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1847 osd_obj_type_f result;
1863 result = osd_mk_index;
1874 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1875 struct dt_object *parent, struct dt_object *child,
1876 cfs_umode_t child_mode)
1880 memset(ah, 0, sizeof(*ah));
1881 ah->dah_parent = parent;
1882 ah->dah_mode = child_mode;
1885 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
1886 struct lu_attr *attr, struct dt_object_format *dof)
1888 struct inode *inode = obj->oo_inode;
1889 __u64 valid = attr->la_valid;
1892 attr->la_valid &= ~(LA_TYPE | LA_MODE);
1894 if (dof->dof_type != DFT_NODE)
1895 attr->la_valid &= ~LA_RDEV;
1896 if ((valid & LA_ATIME) && (attr->la_atime == LTIME_S(inode->i_atime)))
1897 attr->la_valid &= ~LA_ATIME;
1898 if ((valid & LA_CTIME) && (attr->la_ctime == LTIME_S(inode->i_ctime)))
1899 attr->la_valid &= ~LA_CTIME;
1900 if ((valid & LA_MTIME) && (attr->la_mtime == LTIME_S(inode->i_mtime)))
1901 attr->la_valid &= ~LA_MTIME;
1903 result = osd_quota_transfer(inode, attr);
1907 if (attr->la_valid != 0) {
1908 result = osd_inode_setattr(info->oti_env, inode, attr);
1910 * The osd_inode_setattr() should always succeed here. The
1911 * only error that could be returned is EDQUOT when we are
1912 * trying to change the UID or GID of the inode. However, this
1913 * should not happen since quota enforcement is no longer
1914 * enabled on ldiskfs (lquota takes care of it).
1916 LASSERTF(result == 0, "%d", result);
1917 inode->i_sb->s_op->dirty_inode(inode);
1920 attr->la_valid = valid;
1924 * Helper function for osd_object_create()
1926 * \retval 0, on success
1928 static int __osd_object_create(struct osd_thread_info *info,
1929 struct osd_object *obj, struct lu_attr *attr,
1930 struct dt_allocation_hint *hint,
1931 struct dt_object_format *dof,
1936 result = osd_create_type_f(dof->dof_type)(info, obj, attr, hint, dof,
1939 osd_attr_init(info, obj, attr, dof);
1940 osd_object_init0(obj);
1942 if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1943 unlock_new_inode(obj->oo_inode);
1950 * Helper function for osd_object_create()
1952 * \retval 0, on success
1954 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1955 const struct lu_fid *fid, struct thandle *th)
1957 struct osd_thread_info *info = osd_oti_get(env);
1958 struct osd_inode_id *id = &info->oti_id;
1959 struct osd_device *osd = osd_obj2dev(obj);
1961 LASSERT(obj->oo_inode != NULL);
1963 osd_id_gen(id, obj->oo_inode->i_ino, obj->oo_inode->i_generation);
1964 return osd_oi_insert(info, osd, fid, id, th);
1967 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
1968 const struct lu_fid *fid, struct lu_seq_range *range)
1970 struct seq_server_site *ss = osd_seq_site(osd);
1973 if (fid_is_idif(fid)) {
1974 range->lsr_flags = LU_SEQ_RANGE_OST;
1975 range->lsr_index = fid_idif_ost_idx(fid);
1979 if (!fid_seq_in_fldb(fid_seq(fid))) {
1980 range->lsr_flags = LU_SEQ_RANGE_MDT;
1982 /* FIXME: If ss is NULL, it suppose not get lsr_index
1984 range->lsr_index = ss->ss_node_id;
1988 LASSERT(ss != NULL);
1989 range->lsr_flags = -1;
1990 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1992 CERROR("%s can not find "DFID": rc = %d\n",
1993 osd_name(osd), PFID(fid), rc);
1999 * Concurrency: no external locking is necessary.
2001 static int osd_declare_object_create(const struct lu_env *env,
2002 struct dt_object *dt,
2003 struct lu_attr *attr,
2004 struct dt_allocation_hint *hint,
2005 struct dt_object_format *dof,
2006 struct thandle *handle)
2008 struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
2009 struct osd_thandle *oh;
2013 LASSERT(handle != NULL);
2015 oh = container_of0(handle, struct osd_thandle, ot_super);
2016 LASSERT(oh->ot_handle == NULL);
2018 osd_trans_declare_op(env, oh, OSD_OT_CREATE,
2019 osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
2020 if (!fid_is_on_ost(osd_oti_get(env), osd_dt_dev(handle->th_dev),
2021 lu_object_fid(&dt->do_lu)))
2022 /* Reuse idle OI block may cause additional one OI block
2024 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2025 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
2027 /* If this is directory, then we expect . and .. to be inserted as
2028 * well. The one directory block always needs to be created for the
2029 * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
2030 * block), there is no danger of needing a tree for the first block.
2032 if (attr && S_ISDIR(attr->la_mode)) {
2033 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2034 osd_dto_credits_noquota[DTO_WRITE_BASE]);
2035 osd_trans_declare_op(env, oh, OSD_OT_INSERT, 0);
2041 rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
2042 false, false, NULL, false);
2046 /* It does fld look up inside declare, and the result will be
2047 * added to fld cache, so the following fld lookup inside insert
2048 * does not need send RPC anymore, so avoid send rpc with holding
2050 if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
2051 !fid_is_last_id(lu_object_fid(&dt->do_lu)))
2052 osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
2053 lu_object_fid(&dt->do_lu), range);
2059 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
2060 struct lu_attr *attr,
2061 struct dt_allocation_hint *hint,
2062 struct dt_object_format *dof,
2065 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2066 struct osd_object *obj = osd_dt_obj(dt);
2067 struct osd_thread_info *info = osd_oti_get(env);
2072 LINVRNT(osd_invariant(obj));
2073 LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
2074 LASSERT(osd_write_locked(env, obj));
2075 LASSERT(th != NULL);
2077 if (unlikely(fid_is_acct(fid)))
2078 /* Quota files can't be created from the kernel any more,
2079 * 'tune2fs -O quota' will take care of creating them */
2082 osd_trans_exec_op(env, th, OSD_OT_CREATE);
2083 osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2085 result = __osd_object_create(info, obj, attr, hint, dof, th);
2087 result = __osd_oi_insert(env, obj, fid, th);
2089 LASSERT(ergo(result == 0,
2090 dt_object_exists(dt) && !dt_object_remote(dt)));
2092 LASSERT(osd_invariant(obj));
2097 * Called to destroy on-disk representation of the object
2099 * Concurrency: must be locked
2101 static int osd_declare_object_destroy(const struct lu_env *env,
2102 struct dt_object *dt,
2105 struct osd_object *obj = osd_dt_obj(dt);
2106 struct inode *inode = obj->oo_inode;
2107 struct osd_thandle *oh;
2111 oh = container_of0(th, struct osd_thandle, ot_super);
2112 LASSERT(oh->ot_handle == NULL);
2115 osd_trans_declare_op(env, oh, OSD_OT_DELETE,
2116 osd_dto_credits_noquota[DTO_OBJECT_DELETE]);
2117 /* Recycle idle OI leaf may cause additional three OI blocks
2119 osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
2120 osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
2122 /* one less inode */
2123 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
2124 false, true, NULL, false);
2127 /* data to be truncated */
2128 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
2129 true, true, NULL, false);
2133 static int osd_object_destroy(const struct lu_env *env,
2134 struct dt_object *dt,
2137 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2138 struct osd_object *obj = osd_dt_obj(dt);
2139 struct inode *inode = obj->oo_inode;
2140 struct osd_device *osd = osd_obj2dev(obj);
2141 struct osd_thandle *oh;
2145 oh = container_of0(th, struct osd_thandle, ot_super);
2146 LASSERT(oh->ot_handle);
2148 LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
2150 if (unlikely(fid_is_acct(fid)))
2153 /* Parallel control for OI scrub. For most of cases, there is no
2154 * lock contention. So it will not affect unlink performance. */
2155 mutex_lock(&inode->i_mutex);
2156 if (S_ISDIR(inode->i_mode)) {
2157 LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1);
2158 /* it will check/delete the agent inode for every dir
2159 * destory, how to optimize it? unlink performance
2161 result = osd_delete_from_agent(env, osd, obj, oh);
2162 if (result != 0 && result != -ENOENT) {
2163 CERROR("%s: delete agent inode "DFID": rc = %d\n",
2164 osd_name(osd), PFID(fid), result);
2166 spin_lock(&obj->oo_guard);
2168 spin_unlock(&obj->oo_guard);
2169 inode->i_sb->s_op->dirty_inode(inode);
2172 osd_trans_exec_op(env, th, OSD_OT_DESTROY);
2174 result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
2175 mutex_unlock(&inode->i_mutex);
2177 /* XXX: add to ext3 orphan list */
2178 /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
2180 /* not needed in the cache anymore */
2181 set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
2186 static inline int __osd_xattr_set(struct osd_thread_info *info,
2187 struct inode *inode, const char *name,
2188 const void *buf, int buflen, int fl)
2190 struct dentry *dentry = &info->oti_child_dentry;
2192 ll_vfs_dq_init(inode);
2193 dentry->d_inode = inode;
2194 return inode->i_op->setxattr(dentry, name, buf, buflen, fl);
2198 * Put the fid into lustre_mdt_attrs, and then place the structure
2199 * inode's ea. This fid should not be altered during the life time
2202 * \retval +ve, on success
2203 * \retval -ve, on error
2205 * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
2207 int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
2208 const struct lu_fid *fid)
2210 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
2213 if (OBD_FAIL_CHECK(OBD_FAIL_FID_INLMA))
2216 if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF) && fid_is_client_visible(fid))
2219 lustre_lma_init(lma, fid);
2220 lustre_lma_swab(lma);
2222 rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma, sizeof(*lma),
2224 /* Someone may created the EA by race. */
2225 if (unlikely(rc == -EEXIST))
2231 * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
2232 * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
2233 * To have compatilibility with 1.8 ldiskfs driver we need to have
2234 * magic number at start of fid data.
2235 * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
2238 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
2239 const struct dt_rec *fid)
2241 if (!fid_is_client_mdt_visible((const struct lu_fid *)fid)) {
2242 param->edp_magic = 0;
2246 param->edp_magic = LDISKFS_LUFID_MAGIC;
2247 param->edp_len = sizeof(struct lu_fid) + 1;
2248 fid_cpu_to_be((struct lu_fid *)param->edp_data, (struct lu_fid *)fid);
2252 * Try to read the fid from inode ea into dt_rec.
2254 * \param fid object fid.
2256 * \retval 0 on success
2258 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
2259 __u32 ino, struct lu_fid *fid,
2260 struct osd_inode_id *id)
2262 struct osd_thread_info *info = osd_oti_get(env);
2263 struct inode *inode;
2266 osd_id_gen(id, ino, OSD_OII_NOGEN);
2267 inode = osd_iget_fid(info, osd_obj2dev(obj), id, fid);
2269 RETURN(PTR_ERR(inode));
2275 static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
2277 struct inode *parent_dir,
2278 const struct dt_rec *dot_fid,
2279 const struct dt_rec *dot_dot_fid,
2280 struct osd_thandle *oth)
2282 struct ldiskfs_dentry_param *dot_ldp;
2283 struct ldiskfs_dentry_param *dot_dot_ldp;
2285 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2286 osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2288 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2289 dot_ldp->edp_magic = 0;
2290 return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
2291 dir, dot_ldp, dot_dot_ldp);
2295 * Create an local inode for remote entry
2297 static struct inode *osd_create_remote_inode(const struct lu_env *env,
2298 struct osd_device *osd,
2299 struct osd_object *pobj,
2300 const struct lu_fid *fid,
2303 struct osd_thread_info *info = osd_oti_get(env);
2304 struct inode *local;
2305 struct osd_thandle *oh;
2310 oh = container_of(th, struct osd_thandle, ot_super);
2311 LASSERT(oh->ot_handle->h_transaction != NULL);
2313 /* FIXME: Insert index api needs to know the mode of
2314 * the remote object. Just use S_IFDIR for now */
2315 local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
2316 if (IS_ERR(local)) {
2317 CERROR("%s: create local error %d\n", osd_name(osd),
2318 (int)PTR_ERR(local));
2322 rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
2323 (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
2324 (const struct dt_rec *)fid, oh);
2326 CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
2327 osd_name(osd), PFID(fid), rc);
2328 RETURN(ERR_PTR(rc));
2335 * Delete local inode for remote entry
2337 static int osd_delete_remote_inode(const struct lu_env *env,
2338 struct osd_device *osd,
2339 const struct lu_fid *fid,
2340 __u32 ino, struct osd_thandle *oh)
2342 struct osd_thread_info *oti = osd_oti_get(env);
2343 struct osd_inode_id *id = &oti->oti_id;
2344 struct inode *inode;
2347 id->oii_ino = le32_to_cpu(ino);
2348 id->oii_gen = OSD_OII_NOGEN;
2349 inode = osd_iget(oti, osd, id);
2350 if (IS_ERR(inode)) {
2351 CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd),
2352 PFID(fid), id->oii_ino, id->oii_gen);
2353 RETURN(PTR_ERR(inode));
2357 mark_inode_dirty(inode);
2358 CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n",
2359 osd_name(osd), PFID(fid), inode->i_ino);
2365 * OSD layer object create function for interoperability mode (b11826).
2366 * This is mostly similar to osd_object_create(). Only difference being, fid is
2367 * inserted into inode ea here.
2369 * \retval 0, on success
2370 * \retval -ve, on error
2372 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
2373 struct lu_attr *attr,
2374 struct dt_allocation_hint *hint,
2375 struct dt_object_format *dof,
2378 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2379 struct osd_object *obj = osd_dt_obj(dt);
2380 struct osd_thread_info *info = osd_oti_get(env);
2385 LASSERT(osd_invariant(obj));
2386 LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
2387 LASSERT(osd_write_locked(env, obj));
2388 LASSERT(th != NULL);
2390 if (unlikely(fid_is_acct(fid)))
2391 /* Quota files can't be created from the kernel any more,
2392 * 'tune2fs -O quota' will take care of creating them */
2395 osd_trans_exec_op(env, th, OSD_OT_CREATE);
2396 osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2398 result = __osd_object_create(info, obj, attr, hint, dof, th);
2399 if ((result == 0) &&
2400 (fid_is_last_id(fid) ||
2401 !fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid)))
2402 result = osd_ea_fid_set(info, obj->oo_inode, fid);
2405 result = __osd_oi_insert(env, obj, fid, th);
2407 LASSERT(ergo(result == 0,
2408 dt_object_exists(dt) && !dt_object_remote(dt)));
2409 LINVRNT(osd_invariant(obj));
2413 static int osd_declare_object_ref_add(const struct lu_env *env,
2414 struct dt_object *dt,
2415 struct thandle *handle)
2417 struct osd_thandle *oh;
2419 /* it's possible that object doesn't exist yet */
2420 LASSERT(handle != NULL);
2422 oh = container_of0(handle, struct osd_thandle, ot_super);
2423 LASSERT(oh->ot_handle == NULL);
2425 osd_trans_declare_op(env, oh, OSD_OT_REF_ADD,
2426 osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2432 * Concurrency: @dt is write locked.
2434 static int osd_object_ref_add(const struct lu_env *env,
2435 struct dt_object *dt, struct thandle *th)
2437 struct osd_object *obj = osd_dt_obj(dt);
2438 struct inode *inode = obj->oo_inode;
2440 LINVRNT(osd_invariant(obj));
2441 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2442 LASSERT(osd_write_locked(env, obj));
2443 LASSERT(th != NULL);
2445 osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
2448 * DIR_NLINK feature is set for compatibility reasons if:
2449 * 1) nlinks > LDISKFS_LINK_MAX, or
2450 * 2) nlinks == 2, since this indicates i_nlink was previously 1.
2452 * It is easier to always set this flag (rather than check and set),
2453 * since it has less overhead, and the superblock will be dirtied
2454 * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
2455 * do not actually care whether this flag is set or not.
2457 spin_lock(&obj->oo_guard);
2458 /* inc_nlink from 0 may cause WARN_ON */
2459 if(inode->i_nlink == 0)
2460 set_nlink(inode, 1);
2463 if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
2464 if (inode->i_nlink >= LDISKFS_LINK_MAX ||
2465 inode->i_nlink == 2)
2466 set_nlink(inode, 1);
2468 LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
2469 spin_unlock(&obj->oo_guard);
2470 inode->i_sb->s_op->dirty_inode(inode);
2471 LINVRNT(osd_invariant(obj));
2476 static int osd_declare_object_ref_del(const struct lu_env *env,
2477 struct dt_object *dt,
2478 struct thandle *handle)
2480 struct osd_thandle *oh;
2482 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2483 LASSERT(handle != NULL);
2485 oh = container_of0(handle, struct osd_thandle, ot_super);
2486 LASSERT(oh->ot_handle == NULL);
2488 osd_trans_declare_op(env, oh, OSD_OT_REF_DEL,
2489 osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2495 * Concurrency: @dt is write locked.
2497 static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
2500 struct osd_object *obj = osd_dt_obj(dt);
2501 struct inode *inode = obj->oo_inode;
2503 LINVRNT(osd_invariant(obj));
2504 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2505 LASSERT(osd_write_locked(env, obj));
2506 LASSERT(th != NULL);
2508 osd_trans_exec_op(env, th, OSD_OT_REF_DEL);
2510 spin_lock(&obj->oo_guard);
2511 LASSERT(inode->i_nlink > 0);
2513 /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
2514 * then the nlink count is 1. Don't let it be set to 0 or the directory
2515 * inode will be deleted incorrectly. */
2516 if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
2517 set_nlink(inode, 1);
2518 spin_unlock(&obj->oo_guard);
2519 inode->i_sb->s_op->dirty_inode(inode);
2520 LINVRNT(osd_invariant(obj));
2526 * Get the 64-bit version for an inode.
2528 static int osd_object_version_get(const struct lu_env *env,
2529 struct dt_object *dt, dt_obj_version_t *ver)
2531 struct inode *inode = osd_dt_obj(dt)->oo_inode;
2533 CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2534 LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2535 *ver = LDISKFS_I(inode)->i_fs_version;
2540 * Concurrency: @dt is read locked.
2542 static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
2543 struct lu_buf *buf, const char *name,
2544 struct lustre_capa *capa)
2546 struct osd_object *obj = osd_dt_obj(dt);
2547 struct inode *inode = obj->oo_inode;
2548 struct osd_thread_info *info = osd_oti_get(env);
2549 struct dentry *dentry = &info->oti_obj_dentry;
2551 /* version get is not real XATTR but uses xattr API */
2552 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2553 /* for version we are just using xattr API but change inode
2555 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2556 osd_object_version_get(env, dt, buf->lb_buf);
2557 return sizeof(dt_obj_version_t);
2560 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2561 LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2563 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2566 return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
2570 static int osd_declare_xattr_set(const struct lu_env *env,
2571 struct dt_object *dt,
2572 const struct lu_buf *buf, const char *name,
2573 int fl, struct thandle *handle)
2575 struct osd_thandle *oh;
2577 LASSERT(handle != NULL);
2579 oh = container_of0(handle, struct osd_thandle, ot_super);
2580 LASSERT(oh->ot_handle == NULL);
2582 osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2583 strcmp(name, XATTR_NAME_VERSION) == 0 ?
2584 osd_dto_credits_noquota[DTO_ATTR_SET_BASE] :
2585 osd_dto_credits_noquota[DTO_XATTR_SET]);
2591 * Set the 64-bit version for object
2593 static void osd_object_version_set(const struct lu_env *env,
2594 struct dt_object *dt,
2595 dt_obj_version_t *new_version)
2597 struct inode *inode = osd_dt_obj(dt)->oo_inode;
2599 CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2600 *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2602 LDISKFS_I(inode)->i_fs_version = *new_version;
2603 /** Version is set after all inode operations are finished,
2604 * so we should mark it dirty here */
2605 inode->i_sb->s_op->dirty_inode(inode);
2609 * Concurrency: @dt is write locked.
2611 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2612 const struct lu_buf *buf, const char *name, int fl,
2613 struct thandle *handle, struct lustre_capa *capa)
2615 struct osd_object *obj = osd_dt_obj(dt);
2616 struct inode *inode = obj->oo_inode;
2617 struct osd_thread_info *info = osd_oti_get(env);
2620 LASSERT(handle != NULL);
2622 /* version set is not real XATTR */
2623 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2624 /* for version we are just using xattr API but change inode
2626 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2627 osd_object_version_set(env, dt, buf->lb_buf);
2628 return sizeof(dt_obj_version_t);
2631 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2634 osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2635 if (fl & LU_XATTR_REPLACE)
2636 fs_flags |= XATTR_REPLACE;
2638 if (fl & LU_XATTR_CREATE)
2639 fs_flags |= XATTR_CREATE;
2641 return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
2646 * Concurrency: @dt is read locked.
2648 static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
2649 struct lu_buf *buf, struct lustre_capa *capa)
2651 struct osd_object *obj = osd_dt_obj(dt);
2652 struct inode *inode = obj->oo_inode;
2653 struct osd_thread_info *info = osd_oti_get(env);
2654 struct dentry *dentry = &info->oti_obj_dentry;
2656 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2657 LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2658 LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2660 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2663 dentry->d_inode = inode;
2664 return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2667 static int osd_declare_xattr_del(const struct lu_env *env,
2668 struct dt_object *dt, const char *name,
2669 struct thandle *handle)
2671 struct osd_thandle *oh;
2673 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2674 LASSERT(handle != NULL);
2676 oh = container_of0(handle, struct osd_thandle, ot_super);
2677 LASSERT(oh->ot_handle == NULL);
2679 osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2680 osd_dto_credits_noquota[DTO_XATTR_SET]);
2686 * Concurrency: @dt is write locked.
2688 static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
2689 const char *name, struct thandle *handle,
2690 struct lustre_capa *capa)
2692 struct osd_object *obj = osd_dt_obj(dt);
2693 struct inode *inode = obj->oo_inode;
2694 struct osd_thread_info *info = osd_oti_get(env);
2695 struct dentry *dentry = &info->oti_obj_dentry;
2698 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2699 LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2700 LASSERT(handle != NULL);
2702 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2705 osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2707 ll_vfs_dq_init(inode);
2708 dentry->d_inode = inode;
2709 rc = inode->i_op->removexattr(dentry, name);
2713 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2714 struct dt_object *dt,
2715 struct lustre_capa *old,
2718 struct osd_thread_info *info = osd_oti_get(env);
2719 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2720 struct osd_object *obj = osd_dt_obj(dt);
2721 struct osd_device *dev = osd_obj2dev(obj);
2722 struct lustre_capa_key *key = &info->oti_capa_key;
2723 struct lustre_capa *capa = &info->oti_capa;
2724 struct obd_capa *oc;
2725 struct md_capainfo *ci;
2729 if (!dev->od_fl_capa)
2730 RETURN(ERR_PTR(-ENOENT));
2732 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2733 LINVRNT(osd_invariant(obj));
2735 /* renewal sanity check */
2736 if (old && osd_object_auth(env, dt, old, opc))
2737 RETURN(ERR_PTR(-EACCES));
2739 ci = md_capainfo(env);
2741 RETURN(ERR_PTR(-ENOENT));
2743 switch (ci->mc_auth) {
2747 capa->lc_uid = obj->oo_inode->i_uid;
2748 capa->lc_gid = obj->oo_inode->i_gid;
2749 capa->lc_flags = LC_ID_PLAIN;
2751 case LC_ID_CONVERT: {
2754 s[0] = obj->oo_inode->i_uid;
2755 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2756 s[2] = obj->oo_inode->i_gid;
2757 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2758 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2760 RETURN(ERR_PTR(rc));
2762 capa->lc_uid = ((__u64)d[1] << 32) | d[0];
2763 capa->lc_gid = ((__u64)d[3] << 32) | d[2];
2764 capa->lc_flags = LC_ID_CONVERT;
2768 RETURN(ERR_PTR(-EINVAL));
2771 capa->lc_fid = *fid;
2773 capa->lc_flags |= dev->od_capa_alg << 24;
2774 capa->lc_timeout = dev->od_capa_timeout;
2775 capa->lc_expiry = 0;
2777 oc = capa_lookup(dev->od_capa_hash, capa, 1);
2779 LASSERT(!capa_is_expired(oc));
2783 spin_lock(&capa_lock);
2784 *key = dev->od_capa_keys[1];
2785 spin_unlock(&capa_lock);
2787 capa->lc_keyid = key->lk_keyid;
2788 capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2790 rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2792 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2793 RETURN(ERR_PTR(rc));
2796 oc = capa_add(dev->od_capa_hash, capa);
2800 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2802 struct osd_object *obj = osd_dt_obj(dt);
2803 struct inode *inode = obj->oo_inode;
2804 struct osd_thread_info *info = osd_oti_get(env);
2805 struct dentry *dentry = &info->oti_obj_dentry;
2806 struct file *file = &info->oti_file;
2811 dentry->d_inode = inode;
2812 file->f_dentry = dentry;
2813 file->f_mapping = inode->i_mapping;
2814 file->f_op = inode->i_fop;
2815 mutex_lock(&inode->i_mutex);
2816 rc = file->f_op->fsync(file, dentry, 0);
2817 mutex_unlock(&inode->i_mutex);
2821 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2824 struct osd_object *obj = osd_dt_obj(dt);
2827 *data = (void *)obj->oo_inode;
2835 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2836 const struct dt_index_features *feat)
2838 struct iam_descr *descr;
2840 if (osd_object_is_root(o))
2841 return feat == &dt_directory_features;
2843 LASSERT(o->oo_dir != NULL);
2845 descr = o->oo_dir->od_container.ic_descr;
2846 if (feat == &dt_directory_features) {
2847 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2853 feat->dif_keysize_min <= descr->id_key_size &&
2854 descr->id_key_size <= feat->dif_keysize_max &&
2855 feat->dif_recsize_min <= descr->id_rec_size &&
2856 descr->id_rec_size <= feat->dif_recsize_max &&
2857 !(feat->dif_flags & (DT_IND_VARKEY |
2858 DT_IND_VARREC | DT_IND_NONUNQ)) &&
2859 ergo(feat->dif_flags & DT_IND_UPDATE,
2860 1 /* XXX check that object (and file system) is
2865 static int osd_iam_container_init(const struct lu_env *env,
2866 struct osd_object *obj,
2867 struct osd_directory *dir)
2869 struct iam_container *bag = &dir->od_container;
2872 result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2876 result = iam_container_setup(bag);
2878 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2880 iam_container_fini(bag);
2887 * Concurrency: no external locking is necessary.
2889 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2890 const struct dt_index_features *feat)
2894 struct osd_object *obj = osd_dt_obj(dt);
2896 LINVRNT(osd_invariant(obj));
2898 if (osd_object_is_root(obj)) {
2899 dt->do_index_ops = &osd_index_ea_ops;
2901 } else if (feat == &dt_directory_features) {
2902 dt->do_index_ops = &osd_index_ea_ops;
2903 if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode))
2908 } else if (unlikely(feat == &dt_otable_features)) {
2909 dt->do_index_ops = &osd_otable_ops;
2911 } else if (unlikely(feat == &dt_acct_features)) {
2912 dt->do_index_ops = &osd_acct_index_ops;
2915 } else if (!osd_has_index(obj)) {
2916 struct osd_directory *dir;
2921 spin_lock(&obj->oo_guard);
2922 if (obj->oo_dir == NULL)
2926 * Concurrent thread allocated container data.
2929 spin_unlock(&obj->oo_guard);
2931 * Now, that we have container data, serialize its
2934 down_write(&obj->oo_ext_idx_sem);
2936 * recheck under lock.
2938 if (!osd_has_index(obj))
2939 result = osd_iam_container_init(env, obj, dir);
2942 up_write(&obj->oo_ext_idx_sem);
2950 if (result == 0 && skip_iam == 0) {
2951 if (!osd_iam_index_probe(env, obj, feat))
2954 LINVRNT(osd_invariant(obj));
2956 if (result == 0 && is_quota_glb_feat(feat) &&
2957 fid_seq(lu_object_fid(&dt->do_lu)) == FID_SEQ_QUOTA_GLB)
2958 result = osd_quota_migration(env, dt, feat);
2963 static int osd_otable_it_attr_get(const struct lu_env *env,
2964 struct dt_object *dt,
2965 struct lu_attr *attr,
2966 struct lustre_capa *capa)
2972 static const struct dt_object_operations osd_obj_ops = {
2973 .do_read_lock = osd_object_read_lock,
2974 .do_write_lock = osd_object_write_lock,
2975 .do_read_unlock = osd_object_read_unlock,
2976 .do_write_unlock = osd_object_write_unlock,
2977 .do_write_locked = osd_object_write_locked,
2978 .do_attr_get = osd_attr_get,
2979 .do_declare_attr_set = osd_declare_attr_set,
2980 .do_attr_set = osd_attr_set,
2981 .do_ah_init = osd_ah_init,
2982 .do_declare_create = osd_declare_object_create,
2983 .do_create = osd_object_create,
2984 .do_declare_destroy = osd_declare_object_destroy,
2985 .do_destroy = osd_object_destroy,
2986 .do_index_try = osd_index_try,
2987 .do_declare_ref_add = osd_declare_object_ref_add,
2988 .do_ref_add = osd_object_ref_add,
2989 .do_declare_ref_del = osd_declare_object_ref_del,
2990 .do_ref_del = osd_object_ref_del,
2991 .do_xattr_get = osd_xattr_get,
2992 .do_declare_xattr_set = osd_declare_xattr_set,
2993 .do_xattr_set = osd_xattr_set,
2994 .do_declare_xattr_del = osd_declare_xattr_del,
2995 .do_xattr_del = osd_xattr_del,
2996 .do_xattr_list = osd_xattr_list,
2997 .do_capa_get = osd_capa_get,
2998 .do_object_sync = osd_object_sync,
2999 .do_data_get = osd_data_get,
3003 * dt_object_operations for interoperability mode
3004 * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3006 static const struct dt_object_operations osd_obj_ea_ops = {
3007 .do_read_lock = osd_object_read_lock,
3008 .do_write_lock = osd_object_write_lock,
3009 .do_read_unlock = osd_object_read_unlock,
3010 .do_write_unlock = osd_object_write_unlock,
3011 .do_write_locked = osd_object_write_locked,
3012 .do_attr_get = osd_attr_get,
3013 .do_declare_attr_set = osd_declare_attr_set,
3014 .do_attr_set = osd_attr_set,
3015 .do_ah_init = osd_ah_init,
3016 .do_declare_create = osd_declare_object_create,
3017 .do_create = osd_object_ea_create,
3018 .do_declare_destroy = osd_declare_object_destroy,
3019 .do_destroy = osd_object_destroy,
3020 .do_index_try = osd_index_try,
3021 .do_declare_ref_add = osd_declare_object_ref_add,
3022 .do_ref_add = osd_object_ref_add,
3023 .do_declare_ref_del = osd_declare_object_ref_del,
3024 .do_ref_del = osd_object_ref_del,
3025 .do_xattr_get = osd_xattr_get,
3026 .do_declare_xattr_set = osd_declare_xattr_set,
3027 .do_xattr_set = osd_xattr_set,
3028 .do_declare_xattr_del = osd_declare_xattr_del,
3029 .do_xattr_del = osd_xattr_del,
3030 .do_xattr_list = osd_xattr_list,
3031 .do_capa_get = osd_capa_get,
3032 .do_object_sync = osd_object_sync,
3033 .do_data_get = osd_data_get,
3036 static const struct dt_object_operations osd_obj_otable_it_ops = {
3037 .do_attr_get = osd_otable_it_attr_get,
3038 .do_index_try = osd_index_try,
3041 static int osd_index_declare_iam_delete(const struct lu_env *env,
3042 struct dt_object *dt,
3043 const struct dt_key *key,
3044 struct thandle *handle)
3046 struct osd_thandle *oh;
3048 oh = container_of0(handle, struct osd_thandle, ot_super);
3049 LASSERT(oh->ot_handle == NULL);
3051 osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3052 osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3058 * delete a (key, value) pair from index \a dt specified by \a key
3060 * \param dt osd index object
3061 * \param key key for index
3062 * \param rec record reference
3063 * \param handle transaction handler
3066 * \retval -ve failure
3069 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
3070 const struct dt_key *key,
3071 struct thandle *handle,
3072 struct lustre_capa *capa)
3074 struct osd_thread_info *oti = osd_oti_get(env);
3075 struct osd_object *obj = osd_dt_obj(dt);
3076 struct osd_thandle *oh;
3077 struct iam_path_descr *ipd;
3078 struct iam_container *bag = &obj->oo_dir->od_container;
3083 LINVRNT(osd_invariant(obj));
3084 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3085 LASSERT(bag->ic_object == obj->oo_inode);
3086 LASSERT(handle != NULL);
3088 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3091 osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3093 ipd = osd_idx_ipd_get(env, bag);
3094 if (unlikely(ipd == NULL))
3097 oh = container_of0(handle, struct osd_thandle, ot_super);
3098 LASSERT(oh->ot_handle != NULL);
3099 LASSERT(oh->ot_handle->h_transaction != NULL);
3101 if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3102 /* swab quota uid/gid provided by caller */
3103 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3104 key = (const struct dt_key *)&oti->oti_quota_id;
3107 rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
3108 osd_ipd_put(env, bag, ipd);
3109 LINVRNT(osd_invariant(obj));
3113 static int osd_index_declare_ea_delete(const struct lu_env *env,
3114 struct dt_object *dt,
3115 const struct dt_key *key,
3116 struct thandle *handle)
3118 struct osd_thandle *oh;
3119 struct inode *inode;
3123 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3124 LASSERT(handle != NULL);
3126 oh = container_of0(handle, struct osd_thandle, ot_super);
3127 LASSERT(oh->ot_handle == NULL);
3129 osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3130 osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3132 inode = osd_dt_obj(dt)->oo_inode;
3135 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
3136 true, true, NULL, false);
3140 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
3143 struct osd_fid_pack *rec;
3146 if (de->file_type & LDISKFS_DIRENT_LUFID) {
3147 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
3148 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
3153 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
3156 struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
3157 struct seq_server_site *ss = osd_seq_site(osd);
3161 /* Those FID seqs, which are not in FLDB, must be local seq */
3162 if (unlikely(!fid_seq_in_fldb(fid_seq(fid)) || ss == NULL))
3165 rc = osd_fld_lookup(env, osd, fid, range);
3167 CERROR("%s: Can not lookup fld for "DFID"\n",
3168 osd_name(osd), PFID(fid));
3172 RETURN(ss->ss_node_id != range->lsr_index);
3176 * Index delete function for interoperability mode (b11826).
3177 * It will remove the directory entry added by osd_index_ea_insert().
3178 * This entry is needed to maintain name->fid mapping.
3180 * \param key, key i.e. file entry to be deleted
3182 * \retval 0, on success
3183 * \retval -ve, on error
3185 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
3186 const struct dt_key *key,
3187 struct thandle *handle,
3188 struct lustre_capa *capa)
3190 struct osd_object *obj = osd_dt_obj(dt);
3191 struct inode *dir = obj->oo_inode;
3192 struct dentry *dentry;
3193 struct osd_thandle *oh;
3194 struct ldiskfs_dir_entry_2 *de = NULL;
3195 struct buffer_head *bh;
3196 struct htree_lock *hlock = NULL;
3197 struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
3198 struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
3202 LINVRNT(osd_invariant(obj));
3203 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3204 LASSERT(handle != NULL);
3206 osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3208 oh = container_of(handle, struct osd_thandle, ot_super);
3209 LASSERT(oh->ot_handle != NULL);
3210 LASSERT(oh->ot_handle->h_transaction != NULL);
3212 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3215 ll_vfs_dq_init(dir);
3216 dentry = osd_child_dentry_get(env, obj,
3217 (char *)key, strlen((char *)key));
3219 if (obj->oo_hl_head != NULL) {
3220 hlock = osd_oti_get(env)->oti_hlock;
3221 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3222 dir, LDISKFS_HLOCK_DEL);
3224 down_write(&obj->oo_ext_idx_sem);
3227 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3229 rc = ldiskfs_delete_entry(oh->ot_handle,
3236 ldiskfs_htree_unlock(hlock);
3238 up_write(&obj->oo_ext_idx_sem);
3243 /* For inode on the remote MDT, .. will point to
3244 * /Agent directory. So do not try to lookup/delete
3245 * remote inode for .. */
3246 if (strcmp((char *)key, dotdot) == 0)
3249 LASSERT(de != NULL);
3250 rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
3251 if (rc == 0 && osd_remote_fid(env, osd, fid)) {
3252 __u32 ino = le32_to_cpu(de->inode);
3254 rc = osd_delete_remote_inode(env, osd, fid, ino, oh);
3256 CERROR("%s: del local inode "DFID": rc = %d\n",
3257 osd_name(osd), PFID(fid), rc);
3264 LASSERT(osd_invariant(obj));
3269 * Lookup index for \a key and copy record to \a rec.
3271 * \param dt osd index object
3272 * \param key key for index
3273 * \param rec record reference
3275 * \retval +ve success : exact mach
3276 * \retval 0 return record with key not greater than \a key
3277 * \retval -ve failure
3279 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
3280 struct dt_rec *rec, const struct dt_key *key,
3281 struct lustre_capa *capa)
3283 struct osd_object *obj = osd_dt_obj(dt);
3284 struct iam_path_descr *ipd;
3285 struct iam_container *bag = &obj->oo_dir->od_container;
3286 struct osd_thread_info *oti = osd_oti_get(env);
3287 struct iam_iterator *it = &oti->oti_idx_it;
3288 struct iam_rec *iam_rec;
3293 LASSERT(osd_invariant(obj));
3294 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3295 LASSERT(bag->ic_object == obj->oo_inode);
3297 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3300 ipd = osd_idx_ipd_get(env, bag);
3304 /* got ipd now we can start iterator. */
3305 iam_it_init(it, bag, 0, ipd);
3307 if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3308 /* swab quota uid/gid provided by caller */
3309 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3310 key = (const struct dt_key *)&oti->oti_quota_id;
3313 rc = iam_it_get(it, (struct iam_key *)key);
3315 if (S_ISDIR(obj->oo_inode->i_mode))
3316 iam_rec = (struct iam_rec *)oti->oti_ldp;
3318 iam_rec = (struct iam_rec *) rec;
3320 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
3322 if (S_ISDIR(obj->oo_inode->i_mode))
3323 osd_fid_unpack((struct lu_fid *) rec,
3324 (struct osd_fid_pack *)iam_rec);
3325 else if (fid_is_quota(lu_object_fid(&dt->do_lu)))
3326 osd_quota_unpack(obj, rec);
3331 osd_ipd_put(env, bag, ipd);
3333 LINVRNT(osd_invariant(obj));
3338 static int osd_index_declare_iam_insert(const struct lu_env *env,
3339 struct dt_object *dt,
3340 const struct dt_rec *rec,
3341 const struct dt_key *key,
3342 struct thandle *handle)
3344 struct osd_thandle *oh;
3346 LASSERT(handle != NULL);
3348 oh = container_of0(handle, struct osd_thandle, ot_super);
3349 LASSERT(oh->ot_handle == NULL);
3351 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3352 osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3358 * Inserts (key, value) pair in \a dt index object.
3360 * \param dt osd index object
3361 * \param key key for index
3362 * \param rec record reference
3363 * \param th transaction handler
3366 * \retval -ve failure
3368 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
3369 const struct dt_rec *rec,
3370 const struct dt_key *key, struct thandle *th,
3371 struct lustre_capa *capa, int ignore_quota)
3373 struct osd_object *obj = osd_dt_obj(dt);
3374 struct iam_path_descr *ipd;
3375 struct osd_thandle *oh;
3376 struct iam_container *bag = &obj->oo_dir->od_container;
3377 struct osd_thread_info *oti = osd_oti_get(env);
3378 struct iam_rec *iam_rec;
3383 LINVRNT(osd_invariant(obj));
3384 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3385 LASSERT(bag->ic_object == obj->oo_inode);
3386 LASSERT(th != NULL);
3388 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3391 osd_trans_exec_op(env, th, OSD_OT_INSERT);
3393 ipd = osd_idx_ipd_get(env, bag);
3394 if (unlikely(ipd == NULL))
3397 oh = container_of0(th, struct osd_thandle, ot_super);
3398 LASSERT(oh->ot_handle != NULL);
3399 LASSERT(oh->ot_handle->h_transaction != NULL);
3400 if (S_ISDIR(obj->oo_inode->i_mode)) {
3401 iam_rec = (struct iam_rec *)oti->oti_ldp;
3402 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
3403 } else if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3404 /* pack quota uid/gid */
3405 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3406 key = (const struct dt_key *)&oti->oti_quota_id;
3407 /* pack quota record */
3408 rec = osd_quota_pack(obj, rec, &oti->oti_quota_rec);
3409 iam_rec = (struct iam_rec *)rec;
3411 iam_rec = (struct iam_rec *)rec;
3414 rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
3416 osd_ipd_put(env, bag, ipd);
3417 LINVRNT(osd_invariant(obj));
3422 * Calls ldiskfs_add_entry() to add directory entry
3423 * into the directory. This is required for
3424 * interoperability mode (b11826)
3426 * \retval 0, on success
3427 * \retval -ve, on error
3429 static int __osd_ea_add_rec(struct osd_thread_info *info,
3430 struct osd_object *pobj, struct inode *cinode,
3431 const char *name, const struct dt_rec *fid,
3432 struct htree_lock *hlock, struct thandle *th)
3434 struct ldiskfs_dentry_param *ldp;
3435 struct dentry *child;
3436 struct osd_thandle *oth;
3439 oth = container_of(th, struct osd_thandle, ot_super);
3440 LASSERT(oth->ot_handle != NULL);
3441 LASSERT(oth->ot_handle->h_transaction != NULL);
3442 LASSERT(pobj->oo_inode);
3444 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3445 if (unlikely(pobj->oo_inode ==
3446 osd_sb(osd_obj2dev(pobj))->s_root->d_inode))
3449 osd_get_ldiskfs_dirent_param(ldp, fid);
3450 child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
3451 child->d_fsdata = (void *)ldp;
3452 ll_vfs_dq_init(pobj->oo_inode);
3453 rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
3459 * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
3460 * into the directory.Also sets flags into osd object to
3461 * indicate dot and dotdot are created. This is required for
3462 * interoperability mode (b11826)
3464 * \param dir directory for dot and dotdot fixup.
3465 * \param obj child object for linking
3467 * \retval 0, on success
3468 * \retval -ve, on error
3470 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3471 struct osd_object *dir,
3472 struct inode *parent_dir, const char *name,
3473 const struct dt_rec *dot_fid,
3474 const struct dt_rec *dot_dot_fid,
3477 struct inode *inode = dir->oo_inode;
3478 struct osd_thandle *oth;
3481 oth = container_of(th, struct osd_thandle, ot_super);
3482 LASSERT(oth->ot_handle->h_transaction != NULL);
3483 LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3485 if (strcmp(name, dot) == 0) {
3486 if (dir->oo_compat_dot_created) {
3489 LASSERT(inode == parent_dir);
3490 dir->oo_compat_dot_created = 1;
3493 } else if (strcmp(name, dotdot) == 0) {
3494 if (!dir->oo_compat_dot_created)
3496 /* in case of rename, dotdot is already created */
3497 if (dir->oo_compat_dotdot_created) {
3498 return __osd_ea_add_rec(info, dir, parent_dir, name,
3499 dot_dot_fid, NULL, th);
3502 result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
3503 parent_dir, dot_fid,
3506 dir->oo_compat_dotdot_created = 1;
3514 * It will call the appropriate osd_add* function and return the
3515 * value, return by respective functions.
3517 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
3518 struct inode *cinode, const char *name,
3519 const struct dt_rec *fid, struct thandle *th)
3521 struct osd_thread_info *info = osd_oti_get(env);
3522 struct htree_lock *hlock;
3525 hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3527 if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3529 if (hlock != NULL) {
3530 ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3533 down_write(&pobj->oo_ext_idx_sem);
3535 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3536 (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3539 if (hlock != NULL) {
3540 ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3541 pobj->oo_inode, LDISKFS_HLOCK_ADD);
3543 down_write(&pobj->oo_ext_idx_sem);
3546 if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR)) {
3547 struct lu_fid *tfid = &info->oti_fid;
3549 *tfid = *(const struct lu_fid *)fid;
3551 rc = __osd_ea_add_rec(info, pobj, cinode, name,
3552 (const struct dt_rec *)tfid,
3555 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3560 ldiskfs_htree_unlock(hlock);
3562 up_write(&pobj->oo_ext_idx_sem);
3568 osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
3569 struct osd_idmap_cache *oic)
3571 struct osd_scrub *scrub = &dev->od_scrub;
3572 struct lu_fid *fid = &oic->oic_fid;
3573 struct osd_inode_id *id = &oti->oti_id;
3578 if (!fid_is_norm(fid) && !fid_is_igif(fid))
3582 rc = osd_oi_lookup(oti, dev, fid, id, true);
3583 if (rc != 0 && rc != -ENOENT)
3586 if (rc == 0 && osd_id_eq(id, &oic->oic_lid))
3589 if (thread_is_running(&scrub->os_thread)) {
3590 rc = osd_oii_insert(dev, oic, rc == -ENOENT);
3591 /* There is race condition between osd_oi_lookup and OI scrub.
3592 * The OI scrub finished just after osd_oi_lookup() failure.
3593 * Under such case, it is unnecessary to trigger OI scrub again,
3594 * but try to call osd_oi_lookup() again. */
3595 if (unlikely(rc == -EAGAIN))
3601 if (!dev->od_noscrub && ++once == 1) {
3602 CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID"\n",
3604 rc = osd_scrub_start(dev);
3605 LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC for "DFID
3607 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
3616 static int osd_fail_fid_lookup(struct osd_thread_info *oti,
3617 struct osd_device *dev,
3618 struct osd_idmap_cache *oic,
3619 struct lu_fid *fid, __u32 ino)
3621 struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
3622 struct inode *inode;
3625 osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
3626 inode = osd_iget(oti, dev, &oic->oic_lid);
3627 if (IS_ERR(inode)) {
3628 fid_zero(&oic->oic_fid);
3629 return PTR_ERR(inode);
3632 rc = osd_get_lma(oti, inode, &oti->oti_obj_dentry, lma);
3635 fid_zero(&oic->oic_fid);
3637 *fid = oic->oic_fid = lma->lma_self_fid;
3642 * Calls ->lookup() to find dentry. From dentry get inode and
3643 * read inode's ea to get fid. This is required for interoperability
3646 * \retval 0, on success
3647 * \retval -ve, on error
3649 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3650 struct dt_rec *rec, const struct dt_key *key)
3652 struct inode *dir = obj->oo_inode;
3653 struct dentry *dentry;
3654 struct ldiskfs_dir_entry_2 *de;
3655 struct buffer_head *bh;
3656 struct lu_fid *fid = (struct lu_fid *) rec;
3657 struct htree_lock *hlock = NULL;
3662 LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3664 dentry = osd_child_dentry_get(env, obj,
3665 (char *)key, strlen((char *)key));
3667 if (obj->oo_hl_head != NULL) {
3668 hlock = osd_oti_get(env)->oti_hlock;
3669 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3670 dir, LDISKFS_HLOCK_LOOKUP);
3672 down_read(&obj->oo_ext_idx_sem);
3675 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3677 struct osd_thread_info *oti = osd_oti_get(env);
3678 struct osd_inode_id *id = &oti->oti_id;
3679 struct osd_idmap_cache *oic = &oti->oti_cache;
3680 struct osd_device *dev = osd_obj2dev(obj);
3681 struct osd_scrub *scrub = &dev->od_scrub;
3682 struct scrub_file *sf = &scrub->os_file;
3684 ino = le32_to_cpu(de->inode);
3685 if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP)) {
3687 rc = osd_fail_fid_lookup(oti, dev, oic, fid, ino);
3691 rc = osd_get_fid_from_dentry(de, rec);
3693 /* done with de, release bh */
3696 rc = osd_ea_fid_get(env, obj, ino, fid, id);
3698 osd_id_gen(id, ino, OSD_OII_NOGEN);
3699 if (rc != 0 || osd_remote_fid(env, dev, fid)) {
3700 fid_zero(&oic->oic_fid);
3705 oic->oic_fid = *fid;
3706 if ((scrub->os_pos_current <= ino) &&
3707 ((sf->sf_flags & SF_INCONSISTENT) ||
3708 (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
3709 ldiskfs_test_bit(osd_oi_fid2idx(dev, fid),
3711 osd_consistency_check(oti, dev, oic);
3720 ldiskfs_htree_unlock(hlock);
3722 up_read(&obj->oo_ext_idx_sem);
3727 * Find the osd object for given fid.
3729 * \param fid need to find the osd object having this fid
3731 * \retval osd_object on success
3732 * \retval -ve on error
3734 struct osd_object *osd_object_find(const struct lu_env *env,
3735 struct dt_object *dt,
3736 const struct lu_fid *fid)
3738 struct lu_device *ludev = dt->do_lu.lo_dev;
3739 struct osd_object *child = NULL;
3740 struct lu_object *luch;
3741 struct lu_object *lo;
3744 * at this point topdev might not exist yet
3745 * (i.e. MGS is preparing profiles). so we can
3746 * not rely on topdev and instead lookup with
3747 * our device passed as topdev. this can't work
3748 * if the object isn't cached yet (as osd doesn't
3749 * allocate lu_header). IOW, the object must be
3750 * in the cache, otherwise lu_object_alloc() crashes
3753 luch = lu_object_find_at(env, ludev, fid, NULL);
3754 if (!IS_ERR(luch)) {
3755 if (lu_object_exists(luch)) {
3756 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3758 child = osd_obj(lo);
3760 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3761 "lu_object can't be located"
3762 DFID"\n", PFID(fid));
3764 if (child == NULL) {
3765 lu_object_put(env, luch);
3766 CERROR("Unable to get osd_object\n");
3767 child = ERR_PTR(-ENOENT);
3770 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3771 "lu_object does not exists "DFID"\n",
3773 lu_object_put(env, luch);
3774 child = ERR_PTR(-ENOENT);
3777 child = (void *)luch;
3783 * Put the osd object once done with it.
3785 * \param obj osd object that needs to be put
3787 static inline void osd_object_put(const struct lu_env *env,
3788 struct osd_object *obj)
3790 lu_object_put(env, &obj->oo_dt.do_lu);
3793 static int osd_index_declare_ea_insert(const struct lu_env *env,
3794 struct dt_object *dt,
3795 const struct dt_rec *rec,
3796 const struct dt_key *key,
3797 struct thandle *handle)
3799 struct osd_thandle *oh;
3800 struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
3801 struct lu_fid *fid = (struct lu_fid *)rec;
3805 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3806 LASSERT(handle != NULL);
3808 oh = container_of0(handle, struct osd_thandle, ot_super);
3809 LASSERT(oh->ot_handle == NULL);
3811 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3812 osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3814 if (osd_dt_obj(dt)->oo_inode == NULL) {
3815 const char *name = (const char *)key;
3816 /* Object is not being created yet. Only happens when
3817 * 1. declare directory create
3818 * 2. declare insert .
3819 * 3. declare insert ..
3821 LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0);
3823 struct inode *inode = osd_dt_obj(dt)->oo_inode;
3825 /* We ignore block quota on meta pool (MDTs), so needn't
3826 * calculate how many blocks will be consumed by this index
3828 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0,
3829 oh, true, true, NULL, false);
3835 rc = osd_remote_fid(env, osd, fid);
3841 osd_trans_declare_op(env, oh, OSD_OT_CREATE,
3842 osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
3843 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3844 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
3845 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3846 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
3852 * Index add function for interoperability mode (b11826).
3853 * It will add the directory entry.This entry is needed to
3854 * maintain name->fid mapping.
3856 * \param key it is key i.e. file entry to be inserted
3857 * \param rec it is value of given key i.e. fid
3859 * \retval 0, on success
3860 * \retval -ve, on error
3862 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3863 const struct dt_rec *rec,
3864 const struct dt_key *key, struct thandle *th,
3865 struct lustre_capa *capa, int ignore_quota)
3867 struct osd_object *obj = osd_dt_obj(dt);
3868 struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
3869 struct lu_fid *fid = (struct lu_fid *) rec;
3870 const char *name = (const char *)key;
3871 struct osd_thread_info *oti = osd_oti_get(env);
3872 struct osd_inode_id *id = &oti->oti_id;
3873 struct inode *child_inode = NULL;
3874 struct osd_object *child = NULL;
3878 LASSERT(osd_invariant(obj));
3879 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3880 LASSERT(th != NULL);
3882 osd_trans_exec_op(env, th, OSD_OT_INSERT);
3884 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3887 LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
3889 rc = osd_remote_fid(env, osd, fid);
3891 CERROR("%s: Can not find object "DFID" rc %d\n",
3892 osd_name(osd), PFID(fid), rc);
3897 /* Insert remote entry */
3898 if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
3899 struct osd_mdobj_map *omm = osd->od_mdt_map;
3900 struct osd_thandle *oh;
3902 /* If parent on remote MDT, we need put this object
3904 oh = container_of(th, typeof(*oh), ot_super);
3905 rc = osd_add_to_agent(env, osd, obj, oh);
3907 CERROR("%s: add agent "DFID" error: rc = %d\n",
3909 PFID(lu_object_fid(&dt->do_lu)), rc);
3913 child_inode = igrab(omm->omm_agent_dentry->d_inode);
3915 child_inode = osd_create_remote_inode(env, osd, obj,
3917 if (IS_ERR(child_inode))
3918 RETURN(PTR_ERR(child_inode));
3921 /* Insert local entry */
3922 child = osd_object_find(env, dt, fid);
3923 if (IS_ERR(child)) {
3924 CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n",
3925 osd_name(osd), PFID(fid),
3926 id->oii_ino, id->oii_gen,
3927 (int)PTR_ERR(child_inode));
3928 RETURN(PTR_ERR(child_inode));
3930 child_inode = igrab(child->oo_inode);
3933 rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th);
3937 osd_object_put(env, child);
3938 LASSERT(osd_invariant(obj));
3943 * Initialize osd Iterator for given osd index object.
3945 * \param dt osd index object
3948 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3949 struct dt_object *dt,
3951 struct lustre_capa *capa)
3953 struct osd_it_iam *it;
3954 struct osd_thread_info *oti = osd_oti_get(env);
3955 struct osd_object *obj = osd_dt_obj(dt);
3956 struct lu_object *lo = &dt->do_lu;
3957 struct iam_path_descr *ipd;
3958 struct iam_container *bag = &obj->oo_dir->od_container;
3960 LASSERT(lu_object_exists(lo));
3962 if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3963 return ERR_PTR(-EACCES);
3966 ipd = osd_it_ipd_get(env, bag);
3967 if (likely(ipd != NULL)) {
3971 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3972 return (struct dt_it *)it;
3974 return ERR_PTR(-ENOMEM);
3978 * free given Iterator.
3981 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3983 struct osd_it_iam *it = (struct osd_it_iam *)di;
3984 struct osd_object *obj = it->oi_obj;
3986 iam_it_fini(&it->oi_it);
3987 osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3988 lu_object_put(env, &obj->oo_dt.do_lu);
3992 * Move Iterator to record specified by \a key
3994 * \param di osd iterator
3995 * \param key key for index
3997 * \retval +ve di points to record with least key not larger than key
3998 * \retval 0 di points to exact matched key
3999 * \retval -ve failure
4002 static int osd_it_iam_get(const struct lu_env *env,
4003 struct dt_it *di, const struct dt_key *key)
4005 struct osd_thread_info *oti = osd_oti_get(env);
4006 struct osd_it_iam *it = (struct osd_it_iam *)di;
4008 if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
4009 /* swab quota uid/gid */
4010 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
4011 key = (struct dt_key *)&oti->oti_quota_id;
4014 return iam_it_get(&it->oi_it, (const struct iam_key *)key);
4020 * \param di osd iterator
4022 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
4024 struct osd_it_iam *it = (struct osd_it_iam *)di;
4026 iam_it_put(&it->oi_it);
4030 * Move iterator by one record
4032 * \param di osd iterator
4034 * \retval +1 end of container reached
4036 * \retval -ve failure
4039 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
4041 struct osd_it_iam *it = (struct osd_it_iam *)di;
4043 return iam_it_next(&it->oi_it);
4047 * Return pointer to the key under iterator.
4050 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
4051 const struct dt_it *di)
4053 struct osd_thread_info *oti = osd_oti_get(env);
4054 struct osd_it_iam *it = (struct osd_it_iam *)di;
4055 struct osd_object *obj = it->oi_obj;
4058 key = (struct dt_key *)iam_it_key_get(&it->oi_it);
4060 if (!IS_ERR(key) && fid_is_quota(lu_object_fid(&obj->oo_dt.do_lu))) {
4061 /* swab quota uid/gid */
4062 oti->oti_quota_id = le64_to_cpu(*((__u64 *)key));
4063 key = (struct dt_key *)&oti->oti_quota_id;
4070 * Return size of key under iterator (in bytes)
4073 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
4075 struct osd_it_iam *it = (struct osd_it_iam *)di;
4077 return iam_it_key_size(&it->oi_it);
4081 osd_it_append_attrs(struct lu_dirent *ent, int len, __u16 type)
4083 /* check if file type is required */
4084 if (ent->lde_attrs & LUDA_TYPE) {
4085 int align = sizeof(struct luda_type) - 1;
4086 struct luda_type *lt;
4088 len = (len + align) & ~align;
4089 lt = (struct luda_type *)(ent->lde_name + len);
4090 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
4093 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
4097 * build lu direct from backend fs dirent.
4101 osd_it_pack_dirent(struct lu_dirent *ent, struct lu_fid *fid, __u64 offset,
4102 char *name, __u16 namelen, __u16 type, __u32 attr)
4104 ent->lde_attrs = attr | LUDA_FID;
4105 fid_cpu_to_le(&ent->lde_fid, fid);
4107 ent->lde_hash = cpu_to_le64(offset);
4108 ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
4110 strncpy(ent->lde_name, name, namelen);
4111 ent->lde_namelen = cpu_to_le16(namelen);
4113 /* append lustre attributes */
4114 osd_it_append_attrs(ent, namelen, type);
4118 * Return pointer to the record under iterator.
4120 static int osd_it_iam_rec(const struct lu_env *env,
4121 const struct dt_it *di,
4122 struct dt_rec *dtrec, __u32 attr)
4124 struct osd_it_iam *it = (struct osd_it_iam *)di;
4125 struct osd_thread_info *info = osd_oti_get(env);
4128 if (S_ISDIR(it->oi_obj->oo_inode->i_mode)) {
4129 const struct osd_fid_pack *rec;
4130 struct lu_fid *fid = &info->oti_fid;
4131 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
4137 name = (char *)iam_it_key_get(&it->oi_it);
4139 RETURN(PTR_ERR(name));
4141 namelen = iam_it_key_size(&it->oi_it);
4143 rec = (const struct osd_fid_pack *)iam_it_rec_get(&it->oi_it);
4145 RETURN(PTR_ERR(rec));
4147 rc = osd_fid_unpack(fid, rec);
4151 hash = iam_it_store(&it->oi_it);
4153 /* IAM does not store object type in IAM index (dir) */
4154 osd_it_pack_dirent(lde, fid, hash, name, namelen,
4156 } else if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
4157 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
4158 (struct iam_rec *)dtrec);
4159 osd_quota_unpack(it->oi_obj, dtrec);
4161 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
4162 (struct iam_rec *)dtrec);
4169 * Returns cookie for current Iterator position.
4171 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
4173 struct osd_it_iam *it = (struct osd_it_iam *)di;
4175 return iam_it_store(&it->oi_it);
4179 * Restore iterator from cookie.
4181 * \param di osd iterator
4182 * \param hash Iterator location cookie
4184 * \retval +ve di points to record with least key not larger than key.
4185 * \retval 0 di points to exact matched key
4186 * \retval -ve failure
4189 static int osd_it_iam_load(const struct lu_env *env,
4190 const struct dt_it *di, __u64 hash)
4192 struct osd_it_iam *it = (struct osd_it_iam *)di;
4194 return iam_it_load(&it->oi_it, hash);
4197 static const struct dt_index_operations osd_index_iam_ops = {
4198 .dio_lookup = osd_index_iam_lookup,
4199 .dio_declare_insert = osd_index_declare_iam_insert,
4200 .dio_insert = osd_index_iam_insert,
4201 .dio_declare_delete = osd_index_declare_iam_delete,
4202 .dio_delete = osd_index_iam_delete,
4204 .init = osd_it_iam_init,
4205 .fini = osd_it_iam_fini,
4206 .get = osd_it_iam_get,
4207 .put = osd_it_iam_put,
4208 .next = osd_it_iam_next,
4209 .key = osd_it_iam_key,
4210 .key_size = osd_it_iam_key_size,
4211 .rec = osd_it_iam_rec,
4212 .store = osd_it_iam_store,
4213 .load = osd_it_iam_load
4219 * Creates or initializes iterator context.
4221 * \retval struct osd_it_ea, iterator structure on success
4224 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
4225 struct dt_object *dt,
4227 struct lustre_capa *capa)
4229 struct osd_object *obj = osd_dt_obj(dt);
4230 struct osd_thread_info *info = osd_oti_get(env);
4231 struct osd_it_ea *it = &info->oti_it_ea;
4232 struct lu_object *lo = &dt->do_lu;
4233 struct dentry *obj_dentry = &info->oti_it_dentry;
4235 LASSERT(lu_object_exists(lo));
4237 obj_dentry->d_inode = obj->oo_inode;
4238 obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
4239 obj_dentry->d_name.hash = 0;
4241 it->oie_rd_dirent = 0;
4242 it->oie_it_dirent = 0;
4243 it->oie_dirent = NULL;
4244 it->oie_buf = info->oti_it_ea_buf;
4246 it->oie_file.f_pos = 0;
4247 it->oie_file.f_dentry = obj_dentry;
4248 if (attr & LUDA_64BITHASH)
4249 it->oie_file.f_mode |= FMODE_64BITHASH;
4251 it->oie_file.f_mode |= FMODE_32BITHASH;
4252 it->oie_file.f_mapping = obj->oo_inode->i_mapping;
4253 it->oie_file.f_op = obj->oo_inode->i_fop;
4254 it->oie_file.private_data = NULL;
4256 RETURN((struct dt_it *) it);
4260 * Destroy or finishes iterator context.
4262 * \param di iterator structure to be destroyed
4264 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
4266 struct osd_it_ea *it = (struct osd_it_ea *)di;
4267 struct osd_object *obj = it->oie_obj;
4268 struct inode *inode = obj->oo_inode;
4271 it->oie_file.f_op->release(inode, &it->oie_file);
4272 lu_object_put(env, &obj->oo_dt.do_lu);
4277 * It position the iterator at given key, so that next lookup continues from
4278 * that key Or it is similar to dio_it->load() but based on a key,
4279 * rather than file position.
4281 * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
4284 * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
4286 static int osd_it_ea_get(const struct lu_env *env,
4287 struct dt_it *di, const struct dt_key *key)
4289 struct osd_it_ea *it = (struct osd_it_ea *)di;
4292 LASSERT(((const char *)key)[0] == '\0');
4293 it->oie_file.f_pos = 0;
4294 it->oie_rd_dirent = 0;
4295 it->oie_it_dirent = 0;
4296 it->oie_dirent = NULL;
4304 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
4309 * It is called internally by ->readdir(). It fills the
4310 * iterator's in-memory data structure with required
4311 * information i.e. name, namelen, rec_size etc.
4313 * \param buf in which information to be filled in.
4314 * \param name name of the file in given dir
4316 * \retval 0 on success
4317 * \retval 1 on buffer full
4319 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
4320 loff_t offset, __u64 ino,
4323 struct osd_it_ea *it = (struct osd_it_ea *)buf;
4324 struct osd_object *obj = it->oie_obj;
4325 struct osd_it_ea_dirent *ent = it->oie_dirent;
4326 struct lu_fid *fid = &ent->oied_fid;
4327 struct osd_fid_pack *rec;
4330 /* this should never happen */
4331 if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
4332 CERROR("ldiskfs return invalid namelen %d\n", namelen);
4336 if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
4340 /* "." is just the object itself. */
4341 if (namelen == 1 && name[0] == '.') {
4342 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4343 } else if (d_type & LDISKFS_DIRENT_LUFID) {
4344 rec = (struct osd_fid_pack*) (name + namelen + 1);
4345 if (osd_fid_unpack(fid, rec) != 0)
4350 d_type &= ~LDISKFS_DIRENT_LUFID;
4352 /* NOT export local root. */
4353 if (unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) {
4354 ino = obj->oo_inode->i_ino;
4355 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4358 ent->oied_ino = ino;
4359 ent->oied_off = offset;
4360 ent->oied_namelen = namelen;
4361 ent->oied_type = d_type;
4363 memcpy(ent->oied_name, name, namelen);
4365 it->oie_rd_dirent++;
4366 it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
4371 * Calls ->readdir() to load a directory entry at a time
4372 * and stored it in iterator's in-memory data structure.
4374 * \param di iterator's in memory structure
4376 * \retval 0 on success
4377 * \retval -ve on error
4379 static int osd_ldiskfs_it_fill(const struct lu_env *env,
4380 const struct dt_it *di)
4382 struct osd_it_ea *it = (struct osd_it_ea *)di;
4383 struct osd_object *obj = it->oie_obj;
4384 struct inode *inode = obj->oo_inode;
4385 struct htree_lock *hlock = NULL;
4389 it->oie_dirent = it->oie_buf;
4390 it->oie_rd_dirent = 0;
4392 if (obj->oo_hl_head != NULL) {
4393 hlock = osd_oti_get(env)->oti_hlock;
4394 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4395 inode, LDISKFS_HLOCK_READDIR);
4397 down_read(&obj->oo_ext_idx_sem);
4400 result = inode->i_fop->readdir(&it->oie_file, it,
4401 (filldir_t) osd_ldiskfs_filldir);
4404 ldiskfs_htree_unlock(hlock);
4406 up_read(&obj->oo_ext_idx_sem);
4408 if (it->oie_rd_dirent == 0) {
4411 it->oie_dirent = it->oie_buf;
4412 it->oie_it_dirent = 1;
4419 * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4420 * to load a directory entry at a time and stored it in
4421 * iterator's in-memory data structure.
4423 * \param di iterator's in memory structure
4425 * \retval +ve iterator reached to end
4426 * \retval 0 iterator not reached to end
4427 * \retval -ve on error
4429 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
4431 struct osd_it_ea *it = (struct osd_it_ea *)di;
4436 if (it->oie_it_dirent < it->oie_rd_dirent) {
4438 (void *) it->oie_dirent +
4439 cfs_size_round(sizeof(struct osd_it_ea_dirent) +
4440 it->oie_dirent->oied_namelen);
4441 it->oie_it_dirent++;
4444 if (it->oie_file.f_pos == ldiskfs_get_htree_eof(&it->oie_file))
4447 rc = osd_ldiskfs_it_fill(env, di);
4454 * Returns the key at current position from iterator's in memory structure.
4456 * \param di iterator's in memory structure
4458 * \retval key i.e. struct dt_key on success
4460 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
4461 const struct dt_it *di)
4463 struct osd_it_ea *it = (struct osd_it_ea *)di;
4465 return (struct dt_key *)it->oie_dirent->oied_name;
4469 * Returns the key's size at current position from iterator's in memory structure.
4471 * \param di iterator's in memory structure
4473 * \retval key_size i.e. struct dt_key on success
4475 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
4477 struct osd_it_ea *it = (struct osd_it_ea *)di;
4479 return it->oie_dirent->oied_namelen;
4483 osd_dirent_update(handle_t *jh, struct super_block *sb,
4484 struct osd_it_ea_dirent *ent, struct lu_fid *fid,
4485 struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de)
4487 struct osd_fid_pack *rec;
4491 LASSERT(de->file_type & LDISKFS_DIRENT_LUFID);
4492 LASSERT(de->rec_len >= de->name_len + sizeof(struct osd_fid_pack));
4494 rc = ldiskfs_journal_get_write_access(jh, bh);
4496 CERROR("%.16s: fail to write access for update dirent: "
4497 "name = %.*s, rc = %d\n",
4498 LDISKFS_SB(sb)->s_es->s_volume_name,
4499 ent->oied_namelen, ent->oied_name, rc);
4503 rec = (struct osd_fid_pack *)(de->name + de->name_len + 1);
4504 fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid);
4505 rc = ldiskfs_journal_dirty_metadata(jh, bh);
4507 CERROR("%.16s: fail to dirty metadata for update dirent: "
4508 "name = %.*s, rc = %d\n",
4509 LDISKFS_SB(sb)->s_es->s_volume_name,
4510 ent->oied_namelen, ent->oied_name, rc);
4516 osd_dirent_has_space(__u16 reclen, __u16 namelen, unsigned blocksize)
4518 if (ldiskfs_rec_len_from_disk(reclen, blocksize) >=
4519 __LDISKFS_DIR_REC_LEN(namelen + 1 + sizeof(struct osd_fid_pack)))
4526 osd_dirent_reinsert(const struct lu_env *env, handle_t *jh,
4527 struct inode *dir, struct inode *inode,
4528 struct osd_it_ea_dirent *ent, struct lu_fid *fid,
4529 struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de,
4530 struct htree_lock *hlock)
4532 struct dentry *dentry;
4533 struct osd_fid_pack *rec;
4534 struct ldiskfs_dentry_param *ldp;
4538 if (!LDISKFS_HAS_INCOMPAT_FEATURE(inode->i_sb,
4539 LDISKFS_FEATURE_INCOMPAT_DIRDATA))
4542 /* There is enough space to hold the FID-in-dirent. */
4543 if (osd_dirent_has_space(de->rec_len, ent->oied_namelen,
4544 dir->i_sb->s_blocksize)) {
4545 rc = ldiskfs_journal_get_write_access(jh, bh);
4547 CERROR("%.16s: fail to write access for reinsert "
4548 "dirent: name = %.*s, rc = %d\n",
4549 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4550 ent->oied_namelen, ent->oied_name, rc);
4554 de->name[de->name_len] = 0;
4555 rec = (struct osd_fid_pack *)(de->name + de->name_len + 1);
4556 rec->fp_len = sizeof(struct lu_fid) + 1;
4557 fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid);
4558 de->file_type |= LDISKFS_DIRENT_LUFID;
4560 rc = ldiskfs_journal_dirty_metadata(jh, bh);
4562 CERROR("%.16s: fail to dirty metadata for reinsert "
4563 "dirent: name = %.*s, rc = %d\n",
4564 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4565 ent->oied_namelen, ent->oied_name, rc);
4570 rc = ldiskfs_delete_entry(jh, dir, de, bh);
4572 CERROR("%.16s: fail to delete entry for reinsert dirent: "
4573 "name = %.*s, rc = %d\n",
4574 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4575 ent->oied_namelen, ent->oied_name, rc);
4579 dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name,
4581 ldp = (struct ldiskfs_dentry_param *)osd_oti_get(env)->oti_ldp;
4582 osd_get_ldiskfs_dirent_param(ldp, (const struct dt_rec *)fid);
4583 dentry->d_fsdata = (void *)ldp;
4584 ll_vfs_dq_init(dir);
4585 rc = osd_ldiskfs_add_entry(jh, dentry, inode, hlock);
4586 /* It is too bad, we cannot reinsert the name entry back.
4587 * That means we lose it! */
4589 CERROR("%.16s: fail to insert entry for reinsert dirent: "
4590 "name = %.*s, rc = %d\n",
4591 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4592 ent->oied_namelen, ent->oied_name, rc);
4598 osd_dirent_check_repair(const struct lu_env *env, struct osd_object *obj,
4599 struct osd_it_ea *it, struct lu_fid *fid,
4600 struct osd_inode_id *id, __u32 *attr)
4602 struct osd_thread_info *info = osd_oti_get(env);
4603 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
4604 struct osd_device *dev = osd_obj2dev(obj);
4605 struct super_block *sb = osd_sb(dev);
4606 const char *devname =
4607 LDISKFS_SB(sb)->s_es->s_volume_name;
4608 struct osd_it_ea_dirent *ent = it->oie_dirent;
4609 struct inode *dir = obj->oo_inode;
4610 struct htree_lock *hlock = NULL;
4611 struct buffer_head *bh = NULL;
4612 handle_t *jh = NULL;
4613 struct ldiskfs_dir_entry_2 *de;
4614 struct dentry *dentry;
4615 struct inode *inode;
4619 bool is_dotdot = false;
4622 if (ent->oied_name[0] == '.') {
4623 /* Skip dot entry, even if it has stale FID-in-dirent, because
4624 * we do not use such FID-in-dirent anymore, it is harmless. */
4625 if (ent->oied_namelen == 1)
4628 if (ent->oied_namelen == 2 && ent->oied_name[1] == '.')
4632 dentry = osd_child_dentry_get(env, obj, ent->oied_name,
4635 /* We need to ensure that the name entry is still valid.
4636 * Because it may be removed or renamed by other already.
4638 * The unlink or rename operation will start journal before PDO lock,
4639 * so to avoid deadlock, here we need to start journal handle before
4640 * related PDO lock also. But because we do not know whether there
4641 * will be something to be repaired before PDO lock, we just start
4642 * journal without conditions.
4644 * We may need to remove the name entry firstly, then insert back.
4645 * One credit is for user quota file update.
4646 * One credit is for group quota file update.
4647 * Two credits are for dirty inode. */
4648 credits = osd_dto_credits_noquota[DTO_INDEX_DELETE] +
4649 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1 + 1 + 2;
4652 if (dev->od_dirent_journal) {
4653 jh = ldiskfs_journal_start_sb(sb, credits);
4656 CERROR("%.16s: fail to start trans for dirent "
4657 "check_repair: credits %d, name %.*s, rc %d\n",
4658 devname, credits, ent->oied_namelen,
4659 ent->oied_name, rc);
4664 if (obj->oo_hl_head != NULL) {
4665 hlock = osd_oti_get(env)->oti_hlock;
4666 ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir,
4669 down_write(&obj->oo_ext_idx_sem);
4672 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
4673 /* For dotdot entry, if there is not enough space to hold FID-in-dirent,
4674 * just keep it there. It only happens when the device upgraded from 1.8
4675 * or restored from MDT file-level backup. For the whole directory, only
4676 * dotdot entry has no FID-in-dirent and needs to get FID from LMA when
4677 * readdir, it will not affect the performance much. */
4678 if ((bh == NULL) || (le32_to_cpu(de->inode) != ent->oied_ino) ||
4679 (is_dotdot && !osd_dirent_has_space(de->rec_len,
4681 sb->s_blocksize))) {
4682 *attr |= LUDA_IGNORE;
4683 GOTO(out_journal, rc = 0);
4686 osd_id_gen(id, ent->oied_ino, OSD_OII_NOGEN);
4687 inode = osd_iget(info, dev, id);
4688 if (IS_ERR(inode)) {
4689 rc = PTR_ERR(inode);
4690 if (rc == -ENOENT || rc == -ESTALE) {
4691 *attr |= LUDA_IGNORE;
4695 GOTO(out_journal, rc);
4698 rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
4700 if (fid_is_sane(fid)) {
4701 /* FID-in-dirent is valid. */
4702 if (lu_fid_eq(fid, &lma->lma_self_fid))
4703 GOTO(out_inode, rc = 0);
4705 /* Do not repair under dryrun mode. */
4706 if (*attr & LUDA_VERIFY_DRYRUN) {
4707 *attr |= LUDA_REPAIR;
4708 GOTO(out_inode, rc = 0);
4711 if (!dev->od_dirent_journal) {
4715 ldiskfs_htree_unlock(hlock);
4717 up_write(&obj->oo_ext_idx_sem);
4718 dev->od_dirent_journal = 1;
4722 *fid = lma->lma_self_fid;
4724 /* Update the FID-in-dirent. */
4725 rc = osd_dirent_update(jh, sb, ent, fid, bh, de);
4727 *attr |= LUDA_REPAIR;
4729 /* Do not repair under dryrun mode. */
4730 if (*attr & LUDA_VERIFY_DRYRUN) {
4731 *attr |= LUDA_REPAIR;
4732 GOTO(out_inode, rc = 0);
4735 if (!dev->od_dirent_journal) {
4739 ldiskfs_htree_unlock(hlock);
4741 up_write(&obj->oo_ext_idx_sem);
4742 dev->od_dirent_journal = 1;
4746 *fid = lma->lma_self_fid;
4748 /* Append the FID-in-dirent. */
4749 rc = osd_dirent_reinsert(env, jh, dir, inode, ent,
4750 fid, bh, de, hlock);
4752 *attr |= LUDA_REPAIR;
4754 } else if (rc == -ENODATA) {
4755 /* Do not repair under dryrun mode. */
4756 if (*attr & LUDA_VERIFY_DRYRUN) {
4757 if (fid_is_sane(fid))
4758 *attr |= LUDA_REPAIR;
4760 *attr |= LUDA_UPGRADE;
4761 GOTO(out_inode, rc = 0);
4764 if (!dev->od_dirent_journal) {
4768 ldiskfs_htree_unlock(hlock);
4770 up_write(&obj->oo_ext_idx_sem);
4771 dev->od_dirent_journal = 1;
4776 if (unlikely(fid_is_sane(fid))) {
4777 /* FID-in-dirent exists, but FID-in-LMA is lost.
4778 * Trust the FID-in-dirent, and add FID-in-LMA. */
4779 rc = osd_ea_fid_set(info, inode, fid);
4781 *attr |= LUDA_REPAIR;
4783 lu_igif_build(fid, inode->i_ino, inode->i_generation);
4784 /* It is probably IGIF object. Only aappend the
4785 * FID-in-dirent. OI scrub will process FID-in-LMA. */
4786 rc = osd_dirent_reinsert(env, jh, dir, inode, ent,
4787 fid, bh, de, hlock);
4789 *attr |= LUDA_UPGRADE;
4793 GOTO(out_inode, rc);
4801 ldiskfs_htree_unlock(hlock);
4803 up_write(&obj->oo_ext_idx_sem);
4805 ldiskfs_journal_stop(jh);
4806 if (rc >= 0 && !dirty)
4807 dev->od_dirent_journal = 0;
4812 * Returns the value at current position from iterator's in memory structure.
4814 * \param di struct osd_it_ea, iterator's in memory structure
4815 * \param attr attr requested for dirent.
4816 * \param lde lustre dirent
4818 * \retval 0 no error and \param lde has correct lustre dirent.
4819 * \retval -ve on error
4821 static inline int osd_it_ea_rec(const struct lu_env *env,
4822 const struct dt_it *di,
4823 struct dt_rec *dtrec, __u32 attr)
4825 struct osd_it_ea *it = (struct osd_it_ea *)di;
4826 struct osd_object *obj = it->oie_obj;
4827 struct osd_device *dev = osd_obj2dev(obj);
4828 struct osd_scrub *scrub = &dev->od_scrub;
4829 struct scrub_file *sf = &scrub->os_file;
4830 struct osd_thread_info *oti = osd_oti_get(env);
4831 struct osd_inode_id *id = &oti->oti_id;
4832 struct osd_idmap_cache *oic = &oti->oti_cache;
4833 struct lu_fid *fid = &it->oie_dirent->oied_fid;
4834 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
4835 __u32 ino = it->oie_dirent->oied_ino;
4839 if (attr & LUDA_VERIFY) {
4841 if (unlikely(ino == osd_sb(dev)->s_root->d_inode->i_ino)) {
4842 attr |= LUDA_IGNORE;
4847 rc = osd_dirent_check_repair(env, obj, it, fid, id, &attr);
4849 attr &= ~LU_DIRENT_ATTRS_MASK;
4850 if (!fid_is_sane(fid)) {
4851 if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP))
4854 rc = osd_ea_fid_get(env, obj, ino, fid, id);
4856 osd_id_gen(id, ino, OSD_OII_NOGEN);
4864 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
4865 it->oie_dirent->oied_name,
4866 it->oie_dirent->oied_namelen,
4867 it->oie_dirent->oied_type, attr);
4869 if (osd_remote_fid(env, dev, fid))
4872 if (likely(!(attr & LUDA_IGNORE))) {
4874 oic->oic_fid = *fid;
4877 if (!(attr & LUDA_VERIFY) &&
4878 (scrub->os_pos_current <= ino) &&
4879 ((sf->sf_flags & SF_INCONSISTENT) ||
4880 (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
4881 ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
4882 osd_consistency_check(oti, dev, oic);
4888 * Returns a cookie for current position of the iterator head, so that
4889 * user can use this cookie to load/start the iterator next time.
4891 * \param di iterator's in memory structure
4893 * \retval cookie for current position, on success
4895 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
4897 struct osd_it_ea *it = (struct osd_it_ea *)di;
4899 return it->oie_dirent->oied_off;
4903 * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4904 * to load a directory entry at a time and stored it i inn,
4905 * in iterator's in-memory data structure.
4907 * \param di struct osd_it_ea, iterator's in memory structure
4909 * \retval +ve on success
4910 * \retval -ve on error
4912 static int osd_it_ea_load(const struct lu_env *env,
4913 const struct dt_it *di, __u64 hash)
4915 struct osd_it_ea *it = (struct osd_it_ea *)di;
4919 it->oie_file.f_pos = hash;
4921 rc = osd_ldiskfs_it_fill(env, di);
4929 * Index lookup function for interoperability mode (b11826).
4931 * \param key, key i.e. file name to be searched
4933 * \retval +ve, on success
4934 * \retval -ve, on error
4936 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
4937 struct dt_rec *rec, const struct dt_key *key,
4938 struct lustre_capa *capa)
4940 struct osd_object *obj = osd_dt_obj(dt);
4945 LASSERT(S_ISDIR(obj->oo_inode->i_mode));
4946 LINVRNT(osd_invariant(obj));
4948 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
4951 rc = osd_ea_lookup_rec(env, obj, rec, key);
4958 * Index and Iterator operations for interoperability
4959 * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
4961 static const struct dt_index_operations osd_index_ea_ops = {
4962 .dio_lookup = osd_index_ea_lookup,
4963 .dio_declare_insert = osd_index_declare_ea_insert,
4964 .dio_insert = osd_index_ea_insert,
4965 .dio_declare_delete = osd_index_declare_ea_delete,
4966 .dio_delete = osd_index_ea_delete,
4968 .init = osd_it_ea_init,
4969 .fini = osd_it_ea_fini,
4970 .get = osd_it_ea_get,
4971 .put = osd_it_ea_put,
4972 .next = osd_it_ea_next,
4973 .key = osd_it_ea_key,
4974 .key_size = osd_it_ea_key_size,
4975 .rec = osd_it_ea_rec,
4976 .store = osd_it_ea_store,
4977 .load = osd_it_ea_load
4981 static void *osd_key_init(const struct lu_context *ctx,
4982 struct lu_context_key *key)
4984 struct osd_thread_info *info;
4986 OBD_ALLOC_PTR(info);
4988 return ERR_PTR(-ENOMEM);
4990 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4991 if (info->oti_it_ea_buf == NULL)
4994 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
4996 info->oti_hlock = ldiskfs_htree_lock_alloc();
4997 if (info->oti_hlock == NULL)
5003 OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5006 return ERR_PTR(-ENOMEM);
5009 static void osd_key_fini(const struct lu_context *ctx,
5010 struct lu_context_key *key, void* data)
5012 struct osd_thread_info *info = data;
5014 if (info->oti_hlock != NULL)
5015 ldiskfs_htree_lock_free(info->oti_hlock);
5016 OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5020 static void osd_key_exit(const struct lu_context *ctx,
5021 struct lu_context_key *key, void *data)
5023 struct osd_thread_info *info = data;
5025 LASSERT(info->oti_r_locks == 0);
5026 LASSERT(info->oti_w_locks == 0);
5027 LASSERT(info->oti_txns == 0);
5030 /* type constructor/destructor: osd_type_init, osd_type_fini */
5031 LU_TYPE_INIT_FINI(osd, &osd_key);
5033 struct lu_context_key osd_key = {
5034 .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
5035 .lct_init = osd_key_init,
5036 .lct_fini = osd_key_fini,
5037 .lct_exit = osd_key_exit
5041 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
5042 const char *name, struct lu_device *next)
5044 struct osd_device *osd = osd_dev(d);
5046 strncpy(osd->od_svname, name, MAX_OBD_NAME);
5047 return osd_procfs_init(osd, name);
5050 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
5054 osd_scrub_cleanup(env, o);
5057 fsfilt_put_ops(o->od_fsops);
5061 /* shutdown quota slave instance associated with the device */
5062 if (o->od_quota_slave != NULL) {
5063 qsd_fini(env, o->od_quota_slave);
5064 o->od_quota_slave = NULL;
5070 static int osd_mount(const struct lu_env *env,
5071 struct osd_device *o, struct lustre_cfg *cfg)
5073 const char *name = lustre_cfg_string(cfg, 0);
5074 const char *dev = lustre_cfg_string(cfg, 1);
5076 unsigned long page, s_flags, lmd_flags = 0;
5077 struct page *__page;
5078 struct file_system_type *type;
5079 char *options = NULL;
5084 if (o->od_mnt != NULL)
5087 if (strlen(dev) >= sizeof(o->od_mntdev))
5089 strcpy(o->od_mntdev, dev);
5091 o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
5092 if (o->od_fsops == NULL) {
5093 CERROR("Can't find fsfilt_ldiskfs\n");
5097 OBD_PAGE_ALLOC(__page, CFS_ALLOC_STD);
5099 GOTO(out, rc = -ENOMEM);
5101 str = lustre_cfg_string(cfg, 2);
5102 s_flags = simple_strtoul(str, NULL, 0);
5103 str = strstr(str, ":");
5105 lmd_flags = simple_strtoul(str + 1, NULL, 0);
5106 opts = lustre_cfg_string(cfg, 3);
5107 page = (unsigned long)cfs_page_address(__page);
5108 options = (char *)page;
5111 strcat(options, "user_xattr,acl");
5113 strcat(options, opts);
5115 /* Glom up mount options */
5116 if (*options != '\0')
5117 strcat(options, ",");
5118 strlcat(options, "no_mbcache", CFS_PAGE_SIZE);
5120 type = get_fs_type("ldiskfs");
5122 CERROR("%s: cannot find ldiskfs module\n", name);
5123 GOTO(out, rc = -ENODEV);
5126 o->od_mnt = vfs_kern_mount(type, s_flags, dev, options);
5127 cfs_module_put(type->owner);
5129 if (IS_ERR(o->od_mnt)) {
5130 rc = PTR_ERR(o->od_mnt);
5131 CERROR("%s: can't mount %s: %d\n", name, dev, rc);
5136 if (lvfs_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) {
5137 CERROR("%s: underlying device %s is marked as read-only. "
5138 "Setup failed\n", name, dev);
5141 GOTO(out, rc = -EROFS);
5144 if (!LDISKFS_HAS_COMPAT_FEATURE(o->od_mnt->mnt_sb,
5145 LDISKFS_FEATURE_COMPAT_HAS_JOURNAL)) {
5146 CERROR("%s: device %s is mounted w/o journal\n", name, dev);
5149 GOTO(out, rc = -EINVAL);
5152 ldiskfs_set_inode_state(osd_sb(o)->s_root->d_inode,
5153 LDISKFS_STATE_LUSTRE_NO_OI);
5154 if (lmd_flags & LMD_FLG_NOSCRUB)
5159 OBD_PAGE_FREE(__page);
5161 fsfilt_put_ops(o->od_fsops);
5166 static struct lu_device *osd_device_fini(const struct lu_env *env,
5167 struct lu_device *d)
5172 rc = osd_shutdown(env, osd_dev(d));
5174 osd_obj_map_fini(osd_dev(d));
5176 shrink_dcache_sb(osd_sb(osd_dev(d)));
5177 osd_sync(env, lu2dt_dev(d));
5179 rc = osd_procfs_fini(osd_dev(d));
5181 CERROR("proc fini error %d \n", rc);
5182 RETURN (ERR_PTR(rc));
5185 if (osd_dev(d)->od_mnt) {
5186 mntput(osd_dev(d)->od_mnt);
5187 osd_dev(d)->od_mnt = NULL;
5193 static int osd_device_init0(const struct lu_env *env,
5194 struct osd_device *o,
5195 struct lustre_cfg *cfg)
5197 struct lu_device *l = osd2lu_dev(o);
5198 struct osd_thread_info *info;
5201 /* if the module was re-loaded, env can loose its keys */
5202 rc = lu_env_refill((struct lu_env *) env);
5205 info = osd_oti_get(env);
5208 l->ld_ops = &osd_lu_ops;
5209 o->od_dt_dev.dd_ops = &osd_dt_ops;
5211 spin_lock_init(&o->od_osfs_lock);
5212 mutex_init(&o->od_otable_mutex);
5213 o->od_osfs_age = cfs_time_shift_64(-1000);
5215 o->od_capa_hash = init_capa_hash();
5216 if (o->od_capa_hash == NULL)
5217 GOTO(out, rc = -ENOMEM);
5219 o->od_read_cache = 1;
5220 o->od_writethrough_cache = 1;
5221 o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
5223 rc = osd_mount(env, o, cfg);
5227 CFS_INIT_LIST_HEAD(&o->od_ios_list);
5228 /* setup scrub, including OI files initialization */
5229 rc = osd_scrub_setup(env, o);
5233 strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
5234 sizeof(o->od_svname) - 1);
5236 rc = osd_obj_map_init(o);
5238 GOTO(out_scrub, rc);
5240 rc = lu_site_init(&o->od_site, l);
5242 GOTO(out_compat, rc);
5243 o->od_site.ls_bottom_dev = l;
5245 rc = lu_site_init_finish(&o->od_site);
5249 rc = osd_procfs_init(o, o->od_svname);
5251 CERROR("%s: can't initialize procfs: rc = %d\n",
5256 LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev);
5258 /* initialize quota slave instance */
5259 o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
5261 if (IS_ERR(o->od_quota_slave)) {
5262 rc = PTR_ERR(o->od_quota_slave);
5263 o->od_quota_slave = NULL;
5264 GOTO(out_procfs, rc);
5271 lu_site_fini(&o->od_site);
5273 osd_obj_map_fini(o);
5275 osd_scrub_cleanup(env, o);
5277 osd_oi_fini(info, o);
5278 osd_shutdown(env, o);
5282 cleanup_capa_hash(o->od_capa_hash);
5287 static struct lu_device *osd_device_alloc(const struct lu_env *env,
5288 struct lu_device_type *t,
5289 struct lustre_cfg *cfg)
5291 struct osd_device *o;
5296 return ERR_PTR(-ENOMEM);
5298 rc = dt_device_init(&o->od_dt_dev, t);
5300 /* Because the ctx might be revived in dt_device_init,
5301 * refill the env here */
5302 lu_env_refill((struct lu_env *)env);
5303 rc = osd_device_init0(env, o, cfg);
5305 dt_device_fini(&o->od_dt_dev);
5308 if (unlikely(rc != 0))
5311 return rc == 0 ? osd2lu_dev(o) : ERR_PTR(rc);
5314 static struct lu_device *osd_device_free(const struct lu_env *env,
5315 struct lu_device *d)
5317 struct osd_device *o = osd_dev(d);
5320 cleanup_capa_hash(o->od_capa_hash);
5321 /* XXX: make osd top device in order to release reference */
5322 d->ld_site->ls_top_dev = d;
5323 lu_site_purge(env, d->ld_site, -1);
5324 if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
5325 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
5326 lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
5328 lu_site_fini(&o->od_site);
5329 dt_device_fini(&o->od_dt_dev);
5334 static int osd_process_config(const struct lu_env *env,
5335 struct lu_device *d, struct lustre_cfg *cfg)
5337 struct osd_device *o = osd_dev(d);
5341 switch(cfg->lcfg_command) {
5343 err = osd_mount(env, o, cfg);
5346 lu_dev_del_linkage(d->ld_site, d);
5347 err = osd_shutdown(env, o);
5356 static int osd_recovery_complete(const struct lu_env *env,
5357 struct lu_device *d)
5359 struct osd_device *osd = osd_dev(d);
5363 if (osd->od_quota_slave == NULL)
5366 /* start qsd instance on recovery completion, this notifies the quota
5367 * slave code that we are about to process new requests now */
5368 rc = qsd_start(env, osd->od_quota_slave);
5373 * we use exports to track all osd users
5375 static int osd_obd_connect(const struct lu_env *env, struct obd_export **exp,
5376 struct obd_device *obd, struct obd_uuid *cluuid,
5377 struct obd_connect_data *data, void *localdata)
5379 struct osd_device *osd = osd_dev(obd->obd_lu_dev);
5380 struct lustre_handle conn;
5384 CDEBUG(D_CONFIG, "connect #%d\n", osd->od_connects);
5386 rc = class_connect(&conn, obd, cluuid);
5390 *exp = class_conn2export(&conn);
5392 spin_lock(&osd->od_osfs_lock);
5394 spin_unlock(&osd->od_osfs_lock);
5400 * once last export (we don't count self-export) disappeared
5401 * osd can be released
5403 static int osd_obd_disconnect(struct obd_export *exp)
5405 struct obd_device *obd = exp->exp_obd;
5406 struct osd_device *osd = osd_dev(obd->obd_lu_dev);
5407 int rc, release = 0;
5410 /* Only disconnect the underlying layers on the final disconnect. */
5411 spin_lock(&osd->od_osfs_lock);
5413 if (osd->od_connects == 0)
5415 spin_unlock(&osd->od_osfs_lock);
5417 rc = class_disconnect(exp); /* bz 9811 */
5419 if (rc == 0 && release)
5420 class_manual_cleanup(obd);
5424 static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
5425 struct lu_device *dev)
5427 struct osd_device *osd = osd_dev(dev);
5431 if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) {
5432 /* MDT/MDD still use old infrastructure to create
5434 result = llo_local_objects_setup(env, lu2md_dev(pdev),
5440 if (osd->od_quota_slave != NULL)
5441 /* set up quota slave objects */
5442 result = qsd_prepare(env, osd->od_quota_slave);
5447 static const struct lu_object_operations osd_lu_obj_ops = {
5448 .loo_object_init = osd_object_init,
5449 .loo_object_delete = osd_object_delete,
5450 .loo_object_release = osd_object_release,
5451 .loo_object_free = osd_object_free,
5452 .loo_object_print = osd_object_print,
5453 .loo_object_invariant = osd_object_invariant
5456 const struct lu_device_operations osd_lu_ops = {
5457 .ldo_object_alloc = osd_object_alloc,
5458 .ldo_process_config = osd_process_config,
5459 .ldo_recovery_complete = osd_recovery_complete,
5460 .ldo_prepare = osd_prepare,
5463 static const struct lu_device_type_operations osd_device_type_ops = {
5464 .ldto_init = osd_type_init,
5465 .ldto_fini = osd_type_fini,
5467 .ldto_start = osd_type_start,
5468 .ldto_stop = osd_type_stop,
5470 .ldto_device_alloc = osd_device_alloc,
5471 .ldto_device_free = osd_device_free,
5473 .ldto_device_init = osd_device_init,
5474 .ldto_device_fini = osd_device_fini
5477 struct lu_device_type osd_device_type = {
5478 .ldt_tags = LU_DEVICE_DT,
5479 .ldt_name = LUSTRE_OSD_LDISKFS_NAME,
5480 .ldt_ops = &osd_device_type_ops,
5481 .ldt_ctx_tags = LCT_LOCAL,
5485 * lprocfs legacy support.
5487 static struct obd_ops osd_obd_device_ops = {
5488 .o_owner = THIS_MODULE,
5489 .o_connect = osd_obd_connect,
5490 .o_disconnect = osd_obd_disconnect
5493 static int __init osd_mod_init(void)
5495 struct lprocfs_static_vars lvars;
5498 lprocfs_osd_init_vars(&lvars);
5499 return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
5500 LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
5503 static void __exit osd_mod_exit(void)
5505 class_unregister_type(LUSTRE_OSD_LDISKFS_NAME);
5508 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
5509 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_LDISKFS_NAME")");
5510 MODULE_LICENSE("GPL");
5512 cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);