4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/osd/osd_handler.c
38 * Top-level entry points into osd module
40 * Author: Nikita Danilov <nikita@clusterfs.com>
41 * Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
44 #define DEBUG_SUBSYSTEM S_MDS
46 #include <linux/module.h>
48 /* LUSTRE_VERSION_CODE */
49 #include <lustre_ver.h>
50 /* prerequisite for linux/xattr.h */
51 #include <linux/types.h>
52 /* prerequisite for linux/xattr.h */
54 /* XATTR_{REPLACE,CREATE} */
55 #include <linux/xattr.h>
60 * struct OBD_{ALLOC,FREE}*()
63 #include <obd_support.h>
64 /* struct ptlrpc_thread */
65 #include <lustre_net.h>
66 #include <lustre_fid.h>
68 #include "osd_internal.h"
70 /* llo_* api support */
71 #include <md_object.h>
72 #include <lustre_quota.h>
75 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
76 "ldiskfs with parallel directory operations");
78 static const char dot[] = ".";
79 static const char dotdot[] = "..";
80 static const char remote_obj_dir[] = "REM_OBJ_DIR";
82 static const struct lu_object_operations osd_lu_obj_ops;
83 static const struct dt_object_operations osd_obj_ops;
84 static const struct dt_object_operations osd_obj_ea_ops;
85 static const struct dt_object_operations osd_obj_otable_it_ops;
86 static const struct dt_index_operations osd_index_iam_ops;
87 static const struct dt_index_operations osd_index_ea_ops;
89 #ifdef OSD_TRACK_DECLARES
90 int osd_trans_declare_op2rb[] = {
91 [OSD_OT_ATTR_SET] = OSD_OT_ATTR_SET,
92 [OSD_OT_PUNCH] = OSD_OT_MAX,
93 [OSD_OT_XATTR_SET] = OSD_OT_XATTR_SET,
94 [OSD_OT_CREATE] = OSD_OT_DESTROY,
95 [OSD_OT_DESTROY] = OSD_OT_CREATE,
96 [OSD_OT_REF_ADD] = OSD_OT_REF_DEL,
97 [OSD_OT_REF_DEL] = OSD_OT_REF_ADD,
98 [OSD_OT_WRITE] = OSD_OT_WRITE,
99 [OSD_OT_INSERT] = OSD_OT_DELETE,
100 [OSD_OT_DELETE] = OSD_OT_INSERT,
101 [OSD_OT_QUOTA] = OSD_OT_MAX,
105 static int osd_has_index(const struct osd_object *obj)
107 return obj->oo_dt.do_index_ops != NULL;
110 static int osd_object_invariant(const struct lu_object *l)
112 return osd_invariant(osd_obj(l));
116 * Concurrency: doesn't matter
118 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
120 return osd_oti_get(env)->oti_r_locks > 0;
124 * Concurrency: doesn't matter
126 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
128 struct osd_thread_info *oti = osd_oti_get(env);
129 return oti->oti_w_locks > 0 && o->oo_owner == env;
133 * Concurrency: doesn't access mutable data
135 static int osd_root_get(const struct lu_env *env,
136 struct dt_device *dev, struct lu_fid *f)
138 lu_local_obj_fid(f, OSD_FS_ROOT_OID);
143 * OSD object methods.
147 * Concurrency: no concurrent access is possible that early in object
150 static struct lu_object *osd_object_alloc(const struct lu_env *env,
151 const struct lu_object_header *hdr,
154 struct osd_object *mo;
160 l = &mo->oo_dt.do_lu;
161 dt_object_init(&mo->oo_dt, NULL, d);
162 mo->oo_dt.do_ops = &osd_obj_ea_ops;
163 l->lo_ops = &osd_lu_obj_ops;
164 init_rwsem(&mo->oo_sem);
165 init_rwsem(&mo->oo_ext_idx_sem);
166 spin_lock_init(&mo->oo_guard);
173 static inline int __osd_xattr_get(struct inode *inode, struct dentry *dentry,
174 const char *name, void *buf, int len)
176 dentry->d_inode = inode;
177 return inode->i_op->getxattr(dentry, name, buf, len);
180 int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
181 struct dentry *dentry, struct lustre_mdt_attrs *lma)
185 rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA, (void *)lma,
188 /* try with old lma size */
189 rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA,
190 info->oti_mdt_attrs_old,
193 memcpy(lma, info->oti_mdt_attrs_old, sizeof(*lma));
196 /* Check LMA compatibility */
197 if (lma->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP)) {
198 CWARN("%.16s: unsupported incompat LMA feature(s) "
200 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
201 inode->i_ino, le32_to_cpu(lma->lma_incompat) &
205 lustre_lma_swab(lma);
208 } else if (rc == 0) {
216 * retrieve object from backend ext fs.
218 struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
219 struct osd_inode_id *id)
221 struct inode *inode = NULL;
223 inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
225 CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n",
226 id->oii_ino, PTR_ERR(inode));
227 } else if (id->oii_gen != OSD_OII_NOGEN &&
228 inode->i_generation != id->oii_gen) {
229 CDEBUG(D_INODE, "unmatched inode: ino = %u, gen0 = %u, "
231 id->oii_ino, id->oii_gen, inode->i_generation);
233 inode = ERR_PTR(-ESTALE);
234 } else if (inode->i_nlink == 0) {
235 /* due to parallel readdir and unlink,
236 * we can have dead inode here. */
237 CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
238 make_bad_inode(inode);
240 inode = ERR_PTR(-ESTALE);
241 } else if (is_bad_inode(inode)) {
242 CWARN("%.16s: bad inode: ino = %u\n",
243 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, id->oii_ino);
245 inode = ERR_PTR(-ENOENT);
247 if (id->oii_gen == OSD_OII_NOGEN)
248 osd_id_gen(id, inode->i_ino, inode->i_generation);
250 /* Do not update file c/mtime in ldiskfs.
251 * NB: we don't have any lock to protect this because we don't
252 * have reference on osd_object now, but contention with
253 * another lookup + attr_set can't happen in the tiny window
254 * between if (...) and set S_NOCMTIME. */
255 if (!(inode->i_flags & S_NOCMTIME))
256 inode->i_flags |= S_NOCMTIME;
261 static struct inode *
262 osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
263 struct osd_inode_id *id, struct lu_fid *fid)
265 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
269 inode = osd_iget(info, dev, id);
273 rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
275 *fid = lma->lma_self_fid;
276 } else if (rc == -ENODATA) {
277 if (unlikely(inode == osd_sb(dev)->s_root->d_inode))
278 lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
280 lu_igif_build(fid, inode->i_ino, inode->i_generation);
288 static struct inode *
289 osd_iget_verify(struct osd_thread_info *info, struct osd_device *dev,
290 struct osd_inode_id *id, const struct lu_fid *fid)
292 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
296 inode = osd_iget(info, dev, id);
300 rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
309 if (!lu_fid_eq(fid, &lma->lma_self_fid)) {
310 CDEBUG(D_LFSCK, "inconsistent obj: "DFID", %lu, "DFID"\n",
311 PFID(&lma->lma_self_fid), inode->i_ino, PFID(fid));
313 return ERR_PTR(-EREMCHG);
319 static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
320 const struct lu_fid *fid,
321 const struct lu_object_conf *conf)
323 struct osd_thread_info *info;
324 struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
325 struct osd_device *dev;
326 struct osd_idmap_cache *oic;
327 struct osd_inode_id *id;
329 struct osd_scrub *scrub;
330 struct scrub_file *sf;
335 LINVRNT(osd_invariant(obj));
336 LASSERT(obj->oo_inode == NULL);
337 LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
340 scrub = &dev->od_scrub;
341 sf = &scrub->os_file;
342 info = osd_oti_get(env);
344 oic = &info->oti_cache;
346 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
349 /* Search order: 1. per-thread cache. */
350 if (lu_fid_eq(fid, &oic->oic_fid)) {
356 if (!cfs_list_empty(&scrub->os_inconsistent_items)) {
357 /* Search order: 2. OI scrub pending list. */
358 result = osd_oii_lookup(dev, fid, id);
363 if (sf->sf_flags & SF_INCONSISTENT)
367 * Objects are created as locking anchors or place holders for objects
368 * yet to be created. No need to osd_oi_lookup() at here because FID
369 * shouldn't never be re-used, if it's really a duplicate FID from
370 * unexpected reason, we should be able to detect it later by calling
371 * do_create->osd_oi_insert()
373 if (conf != NULL && conf->loc_flags & LOC_F_NEW)
374 GOTO(out, result = 0);
376 /* Search order: 3. OI files. */
377 result = osd_oi_lookup(info, dev, fid, id, true);
378 if (result == -ENOENT) {
379 if (!fid_is_norm(fid) || fid_is_on_ost(info, dev, fid) ||
380 !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
382 GOTO(out, result = 0);
392 inode = osd_iget(info, dev, id);
394 inode = osd_iget_verify(info, dev, id, fid);
396 result = PTR_ERR(inode);
397 if (result == -ENOENT || result == -ESTALE) {
398 fid_zero(&oic->oic_fid);
400 } else if (result == -EREMCHG) {
403 if (thread_is_running(&scrub->os_thread)) {
404 result = -EINPROGRESS;
405 } else if (!dev->od_noscrub) {
406 result = osd_scrub_start(dev);
407 LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC "
408 "for "DFID", rc = %d [1]\n",
409 LDISKFS_SB(osd_sb(dev))->s_es->\
410 s_volume_name,PFID(fid), result);
411 if (result == 0 || result == -EALREADY)
412 result = -EINPROGRESS;
421 obj->oo_inode = inode;
422 LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
424 obj->oo_compat_dot_created = 1;
425 obj->oo_compat_dotdot_created = 1;
427 if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
428 GOTO(out, result = 0);
430 LASSERT(obj->oo_hl_head == NULL);
431 obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
432 if (obj->oo_hl_head == NULL) {
433 obj->oo_inode = NULL;
435 GOTO(out, result = -ENOMEM);
437 GOTO(out, result = 0);
440 LINVRNT(osd_invariant(obj));
445 * Concurrency: shouldn't matter.
447 static void osd_object_init0(struct osd_object *obj)
449 LASSERT(obj->oo_inode != NULL);
450 obj->oo_dt.do_body_ops = &osd_body_ops;
451 obj->oo_dt.do_lu.lo_header->loh_attr |=
452 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
456 * Concurrency: no concurrent access is possible that early in object
459 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
460 const struct lu_object_conf *conf)
462 struct osd_object *obj = osd_obj(l);
465 LINVRNT(osd_invariant(obj));
467 if (fid_is_otable_it(&l->lo_header->loh_fid)) {
468 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
469 l->lo_header->loh_attr |= LOHA_EXISTS;
473 result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
474 obj->oo_dt.do_body_ops = &osd_body_ops_new;
475 if (result == 0 && obj->oo_inode != NULL)
476 osd_object_init0(obj);
478 LINVRNT(osd_invariant(obj));
483 * Concurrency: no concurrent access is possible that late in object
486 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
488 struct osd_object *obj = osd_obj(l);
490 LINVRNT(osd_invariant(obj));
492 dt_object_fini(&obj->oo_dt);
493 if (obj->oo_hl_head != NULL)
494 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
499 * Concurrency: no concurrent access is possible that late in object
502 static void osd_index_fini(struct osd_object *o)
504 struct iam_container *bag;
506 if (o->oo_dir != NULL) {
507 bag = &o->oo_dir->od_container;
508 if (o->oo_inode != NULL) {
509 if (bag->ic_object == o->oo_inode)
510 iam_container_fini(bag);
512 OBD_FREE_PTR(o->oo_dir);
518 * Concurrency: no concurrent access is possible that late in object
519 * life-cycle (for all existing callers, that is. New callers have to provide
520 * their own locking.)
522 static int osd_inode_unlinked(const struct inode *inode)
524 return inode->i_nlink == 0;
528 OSD_TXN_OI_DELETE_CREDITS = 20,
529 OSD_TXN_INODE_DELETE_CREDITS = 20
536 #if OSD_THANDLE_STATS
538 * Set time when the handle is allocated
540 static void osd_th_alloced(struct osd_thandle *oth)
542 oth->oth_alloced = cfs_time_current();
546 * Set time when the handle started
548 static void osd_th_started(struct osd_thandle *oth)
550 oth->oth_started = cfs_time_current();
554 * Helper function to convert time interval to microseconds packed in
557 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
561 cfs_duration_usec(cfs_time_sub(end, start), &val);
562 return val.tv_sec * 1000000 + val.tv_usec;
566 * Check whether the we deal with this handle for too long.
568 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
569 cfs_time_t alloced, cfs_time_t started,
572 cfs_time_t now = cfs_time_current();
574 LASSERT(dev != NULL);
576 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
577 interval_to_usec(alloced, started));
578 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
579 interval_to_usec(started, closed));
580 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
581 interval_to_usec(closed, now));
583 if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
584 CWARN("transaction handle %p was open for too long: "
586 "alloced "CFS_TIME_T" ,"
587 "started "CFS_TIME_T" ,"
588 "closed "CFS_TIME_T"\n",
589 oth, now, alloced, started, closed);
590 libcfs_debug_dumpstack(NULL);
594 #define OSD_CHECK_SLOW_TH(oth, dev, expr) \
596 cfs_time_t __closed = cfs_time_current(); \
597 cfs_time_t __alloced = oth->oth_alloced; \
598 cfs_time_t __started = oth->oth_started; \
601 __osd_th_check_slow(oth, dev, __alloced, __started, __closed); \
604 #else /* OSD_THANDLE_STATS */
606 #define osd_th_alloced(h) do {} while(0)
607 #define osd_th_started(h) do {} while(0)
608 #define OSD_CHECK_SLOW_TH(oth, dev, expr) expr
610 #endif /* OSD_THANDLE_STATS */
613 * Concurrency: doesn't access mutable data.
615 static int osd_param_is_not_sane(const struct osd_device *dev,
616 const struct thandle *th)
618 struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
620 return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers;
624 * Concurrency: shouldn't matter.
626 static void osd_trans_commit_cb(struct super_block *sb,
627 struct ldiskfs_journal_cb_entry *jcb, int error)
629 struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
630 struct thandle *th = &oh->ot_super;
631 struct lu_device *lud = &th->th_dev->dd_lu_dev;
632 struct dt_txn_commit_cb *dcb, *tmp;
634 LASSERT(oh->ot_handle == NULL);
637 CERROR("transaction @0x%p commit error: %d\n", th, error);
639 dt_txn_hook_commit(th);
641 /* call per-transaction callbacks if any */
642 cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
643 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
644 "commit callback entry: magic=%x name='%s'\n",
645 dcb->dcb_magic, dcb->dcb_name);
646 cfs_list_del_init(&dcb->dcb_linkage);
647 dcb->dcb_func(NULL, th, dcb, error);
650 lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
654 lu_context_exit(&th->th_ctx);
655 lu_context_fini(&th->th_ctx);
659 static struct thandle *osd_trans_create(const struct lu_env *env,
662 struct osd_thread_info *oti = osd_oti_get(env);
663 struct osd_iobuf *iobuf = &oti->oti_iobuf;
664 struct osd_thandle *oh;
668 /* on pending IO in this thread should left from prev. request */
669 LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
671 th = ERR_PTR(-ENOMEM);
672 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
674 oh->ot_quota_trans = &oti->oti_quota_trans;
675 memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans));
679 th->th_tags = LCT_TX_HANDLE;
681 oti->oti_dev = osd_dt_dev(d);
682 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
685 memset(oti->oti_declare_ops, 0, OSD_OT_MAX);
686 memset(oti->oti_declare_ops_rb, 0, OSD_OT_MAX);
687 memset(oti->oti_declare_ops_cred, 0, OSD_OT_MAX);
688 oti->oti_rollback = false;
694 * Concurrency: shouldn't matter.
696 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
699 struct osd_thread_info *oti = osd_oti_get(env);
700 struct osd_device *dev = osd_dt_dev(d);
702 struct osd_thandle *oh;
707 LASSERT(current->journal_info == NULL);
709 oh = container_of0(th, struct osd_thandle, ot_super);
711 LASSERT(oh->ot_handle == NULL);
713 rc = dt_txn_hook_start(env, d, th);
717 if (unlikely(osd_param_is_not_sane(dev, th))) {
718 static unsigned long last_printed;
719 static int last_credits;
721 CWARN("%.16s: too many transaction credits (%d > %d)\n",
722 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
724 osd_journal(dev)->j_max_transaction_buffers);
725 #ifdef OSD_TRACK_DECLARES
726 CWARN(" create: %u/%u, delete: %u/%u, destroy: %u/%u\n",
727 oti->oti_declare_ops[OSD_OT_CREATE],
728 oti->oti_declare_ops_cred[OSD_OT_CREATE],
729 oti->oti_declare_ops[OSD_OT_DELETE],
730 oti->oti_declare_ops_cred[OSD_OT_DELETE],
731 oti->oti_declare_ops[OSD_OT_DESTROY],
732 oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
733 CWARN(" attr_set: %u/%u, xattr_set: %u/%u\n",
734 oti->oti_declare_ops[OSD_OT_ATTR_SET],
735 oti->oti_declare_ops_cred[OSD_OT_ATTR_SET],
736 oti->oti_declare_ops[OSD_OT_XATTR_SET],
737 oti->oti_declare_ops_cred[OSD_OT_XATTR_SET]);
738 CWARN(" write: %u/%u, punch: %u/%u, quota %u/%u\n",
739 oti->oti_declare_ops[OSD_OT_WRITE],
740 oti->oti_declare_ops_cred[OSD_OT_WRITE],
741 oti->oti_declare_ops[OSD_OT_PUNCH],
742 oti->oti_declare_ops_cred[OSD_OT_PUNCH],
743 oti->oti_declare_ops[OSD_OT_QUOTA],
744 oti->oti_declare_ops_cred[OSD_OT_QUOTA]);
745 CWARN(" insert: %u/%u, delete: %u/%u\n",
746 oti->oti_declare_ops[OSD_OT_INSERT],
747 oti->oti_declare_ops_cred[OSD_OT_INSERT],
748 oti->oti_declare_ops[OSD_OT_DESTROY],
749 oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
750 CWARN(" ref_add: %u/%u, ref_del: %u/%u\n",
751 oti->oti_declare_ops[OSD_OT_REF_ADD],
752 oti->oti_declare_ops_cred[OSD_OT_REF_ADD],
753 oti->oti_declare_ops[OSD_OT_REF_DEL],
754 oti->oti_declare_ops_cred[OSD_OT_REF_DEL]);
756 if (last_credits != oh->ot_credits &&
757 time_after(jiffies, last_printed + 60 * HZ)) {
758 libcfs_debug_dumpstack(NULL);
759 last_credits = oh->ot_credits;
760 last_printed = jiffies;
763 /* XXX Limit the credits to 'max_transaction_buffers', and
764 * let the underlying filesystem to catch the error if
765 * we really need so many credits.
767 * This should be removed when we can calculate the
768 * credits precisely. */
769 oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
773 * XXX temporary stuff. Some abstraction layer should
776 jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
780 LASSERT(oti->oti_txns == 0);
781 lu_context_init(&th->th_ctx, th->th_tags);
782 lu_context_enter(&th->th_ctx);
784 lu_device_get(&d->dd_lu_dev);
785 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
797 * Concurrency: shouldn't matter.
799 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
802 struct osd_thandle *oh;
803 struct osd_thread_info *oti = osd_oti_get(env);
804 struct osd_iobuf *iobuf = &oti->oti_iobuf;
805 struct qsd_instance *qsd = oti->oti_dev->od_quota_slave;
808 oh = container_of0(th, struct osd_thandle, ot_super);
811 /* inform the quota slave device that the transaction is
813 qsd_op_end(env, qsd, oh->ot_quota_trans);
814 oh->ot_quota_trans = NULL;
816 if (oh->ot_handle != NULL) {
817 handle_t *hdl = oh->ot_handle;
820 * add commit callback
821 * notice we don't do this in osd_trans_start()
822 * as underlying transaction can change during truncate
824 ldiskfs_journal_callback_add(hdl, osd_trans_commit_cb,
827 LASSERT(oti->oti_txns == 1);
829 rc = dt_txn_hook_stop(env, th);
831 CERROR("Failure in transaction hook: %d\n", rc);
833 /* hook functions might modify th_sync */
834 hdl->h_sync = th->th_sync;
836 oh->ot_handle = NULL;
837 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
838 rc = ldiskfs_journal_stop(hdl));
840 CERROR("Failure to stop transaction: %d\n", rc);
845 /* as we want IO to journal and data IO be concurrent, we don't block
846 * awaiting data IO completion in osd_do_bio(), instead we wait here
847 * once transaction is submitted to the journal. all reqular requests
848 * don't do direct IO (except read/write), thus this wait_event becomes
851 * IMPORTANT: we have to wait till any IO submited by the thread is
852 * completed otherwise iobuf may be corrupted by different request
854 cfs_wait_event(iobuf->dr_wait,
855 cfs_atomic_read(&iobuf->dr_numreqs) == 0);
857 rc = iobuf->dr_error;
862 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
864 struct osd_thandle *oh = container_of0(th, struct osd_thandle,
867 LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
868 LASSERT(&dcb->dcb_func != NULL);
869 cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
875 * Called just before object is freed. Releases all resources except for
876 * object itself (that is released by osd_object_free()).
878 * Concurrency: no concurrent access is possible that late in object
881 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
883 struct osd_object *obj = osd_obj(l);
884 struct inode *inode = obj->oo_inode;
886 LINVRNT(osd_invariant(obj));
889 * If object is unlinked remove fid->ino mapping from object index.
894 struct qsd_instance *qsd = osd_obj2dev(obj)->od_quota_slave;
895 qid_t uid = inode->i_uid;
896 qid_t gid = inode->i_gid;
899 obj->oo_inode = NULL;
902 struct osd_thread_info *info = osd_oti_get(env);
903 struct lquota_id_info *qi = &info->oti_qi;
905 /* Release granted quota to master if necessary */
906 qi->lqi_id.qid_uid = uid;
907 qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA);
909 qi->lqi_id.qid_uid = gid;
910 qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA);
916 * Concurrency: ->loo_object_release() is called under site spin-lock.
918 static void osd_object_release(const struct lu_env *env,
924 * Concurrency: shouldn't matter.
926 static int osd_object_print(const struct lu_env *env, void *cookie,
927 lu_printer_t p, const struct lu_object *l)
929 struct osd_object *o = osd_obj(l);
932 if (o->oo_dir != NULL)
933 d = o->oo_dir->od_container.ic_descr;
936 return (*p)(env, cookie,
937 LUSTRE_OSD_LDISKFS_NAME"-object@%p(i:%p:%lu/%u)[%s]",
939 o->oo_inode ? o->oo_inode->i_ino : 0UL,
940 o->oo_inode ? o->oo_inode->i_generation : 0,
941 d ? d->id_ops->id_name : "plain");
945 * Concurrency: shouldn't matter.
947 int osd_statfs(const struct lu_env *env, struct dt_device *d,
948 struct obd_statfs *sfs)
950 struct osd_device *osd = osd_dt_dev(d);
951 struct super_block *sb = osd_sb(osd);
952 struct kstatfs *ksfs;
955 if (unlikely(osd->od_mnt == NULL))
958 /* osd_lproc.c call this without env, allocate ksfs for that case */
959 if (unlikely(env == NULL)) {
964 ksfs = &osd_oti_get(env)->oti_ksfs;
967 spin_lock(&osd->od_osfs_lock);
969 if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
970 result = sb->s_op->statfs(sb->s_root, ksfs);
971 if (likely(result == 0)) { /* N.B. statfs can't really fail */
972 osd->od_osfs_age = cfs_time_current_64();
973 statfs_pack(&osd->od_statfs, ksfs);
974 if (sb->s_flags & MS_RDONLY)
975 sfs->os_state = OS_STATE_READONLY;
979 if (likely(result == 0))
980 *sfs = osd->od_statfs;
981 spin_unlock(&osd->od_osfs_lock);
983 if (unlikely(env == NULL))
990 * Estimate space needed for file creations. We assume the largest filename
991 * which is 2^64 - 1, hence a filename of 20 chars.
992 * This is 28 bytes per object which is 28MB for 1M objects ... no so bad.
994 #ifdef __LDISKFS_DIR_REC_LEN
995 #define PER_OBJ_USAGE __LDISKFS_DIR_REC_LEN(20)
997 #define PER_OBJ_USAGE LDISKFS_DIR_REC_LEN(20)
1001 * Concurrency: doesn't access mutable data.
1003 static void osd_conf_get(const struct lu_env *env,
1004 const struct dt_device *dev,
1005 struct dt_device_param *param)
1007 struct super_block *sb = osd_sb(osd_dt_dev(dev));
1010 * XXX should be taken from not-yet-existing fs abstraction layer.
1012 param->ddp_mnt = osd_dt_dev(dev)->od_mnt;
1013 param->ddp_max_name_len = LDISKFS_NAME_LEN;
1014 param->ddp_max_nlink = LDISKFS_LINK_MAX;
1015 param->ddp_block_shift = sb->s_blocksize_bits;
1016 param->ddp_mount_type = LDD_MT_LDISKFS;
1017 param->ddp_maxbytes = sb->s_maxbytes;
1018 /* Overhead estimate should be fairly accurate, so we really take a tiny
1019 * error margin which also avoids fragmenting the filesystem too much */
1020 param->ddp_grant_reserved = 2; /* end up to be 1.9% after conversion */
1021 /* inode are statically allocated, so per-inode space consumption
1022 * is the space consumed by the directory entry */
1023 param->ddp_inodespace = PER_OBJ_USAGE;
1024 /* per-fragment overhead to be used by the client code */
1025 param->ddp_grant_frag = 6 * LDISKFS_BLOCK_SIZE(sb);
1026 param->ddp_mntopts = 0;
1027 if (test_opt(sb, XATTR_USER))
1028 param->ddp_mntopts |= MNTOPT_USERXATTR;
1029 if (test_opt(sb, POSIX_ACL))
1030 param->ddp_mntopts |= MNTOPT_ACL;
1032 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
1033 if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
1034 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
1037 param->ddp_max_ea_size = sb->s_blocksize;
1042 * Concurrency: shouldn't matter.
1044 static int osd_sync(const struct lu_env *env, struct dt_device *d)
1046 CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1047 return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
1051 * Start commit for OSD device.
1053 * An implementation of dt_commit_async method for OSD device.
1054 * Asychronously starts underlayng fs sync and thereby a transaction
1057 * \param env environment
1058 * \param d dt device
1060 * \see dt_device_operations
1062 static int osd_commit_async(const struct lu_env *env,
1063 struct dt_device *d)
1065 struct super_block *s = osd_sb(osd_dt_dev(d));
1068 CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1069 RETURN(s->s_op->sync_fs(s, 0));
1073 * Concurrency: shouldn't matter.
1076 static int osd_ro(const struct lu_env *env, struct dt_device *d)
1078 struct super_block *sb = osd_sb(osd_dt_dev(d));
1082 CERROR("*** setting %s read-only ***\n", osd_dt_dev(d)->od_svname);
1084 rc = __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
1089 * Concurrency: serialization provided by callers.
1091 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
1092 int mode, unsigned long timeout, __u32 alg,
1093 struct lustre_capa_key *keys)
1095 struct osd_device *dev = osd_dt_dev(d);
1098 dev->od_fl_capa = mode;
1099 dev->od_capa_timeout = timeout;
1100 dev->od_capa_alg = alg;
1101 dev->od_capa_keys = keys;
1106 * Note: we do not count into QUOTA here.
1107 * If we mount with --data_journal we may need more.
1109 const int osd_dto_credits_noquota[DTO_NR] = {
1112 * INDEX_EXTRA_TRANS_BLOCKS(8) +
1113 * SINGLEDATA_TRANS_BLOCKS(8)
1114 * XXX Note: maybe iam need more, since iam have more level than
1117 [DTO_INDEX_INSERT] = 16,
1118 [DTO_INDEX_DELETE] = 16,
1122 [DTO_INDEX_UPDATE] = 16,
1124 * Create a object. The same as create object in EXT3.
1125 * DATA_TRANS_BLOCKS(14) +
1126 * INDEX_EXTRA_BLOCKS(8) +
1127 * 3(inode bits, groups, GDT)
1129 [DTO_OBJECT_CREATE] = 25,
1131 * XXX: real credits to be fixed
1133 [DTO_OBJECT_DELETE] = 25,
1135 * Attr set credits (inode)
1137 [DTO_ATTR_SET_BASE] = 1,
1139 * Xattr set. The same as xattr of EXT3.
1140 * DATA_TRANS_BLOCKS(14)
1141 * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1142 * are also counted in. Do not know why?
1144 [DTO_XATTR_SET] = 14,
1147 * credits for inode change during write.
1149 [DTO_WRITE_BASE] = 3,
1151 * credits for single block write.
1153 [DTO_WRITE_BLOCK] = 14,
1155 * Attr set credits for chown.
1156 * This is extra credits for setattr, and it is null without quota
1158 [DTO_ATTR_SET_CHOWN]= 0
1161 static const struct dt_device_operations osd_dt_ops = {
1162 .dt_root_get = osd_root_get,
1163 .dt_statfs = osd_statfs,
1164 .dt_trans_create = osd_trans_create,
1165 .dt_trans_start = osd_trans_start,
1166 .dt_trans_stop = osd_trans_stop,
1167 .dt_trans_cb_add = osd_trans_cb_add,
1168 .dt_conf_get = osd_conf_get,
1169 .dt_sync = osd_sync,
1171 .dt_commit_async = osd_commit_async,
1172 .dt_init_capa_ctxt = osd_init_capa_ctxt,
1175 static void osd_object_read_lock(const struct lu_env *env,
1176 struct dt_object *dt, unsigned role)
1178 struct osd_object *obj = osd_dt_obj(dt);
1179 struct osd_thread_info *oti = osd_oti_get(env);
1181 LINVRNT(osd_invariant(obj));
1183 LASSERT(obj->oo_owner != env);
1184 down_read_nested(&obj->oo_sem, role);
1186 LASSERT(obj->oo_owner == NULL);
1190 static void osd_object_write_lock(const struct lu_env *env,
1191 struct dt_object *dt, unsigned role)
1193 struct osd_object *obj = osd_dt_obj(dt);
1194 struct osd_thread_info *oti = osd_oti_get(env);
1196 LINVRNT(osd_invariant(obj));
1198 LASSERT(obj->oo_owner != env);
1199 down_write_nested(&obj->oo_sem, role);
1201 LASSERT(obj->oo_owner == NULL);
1202 obj->oo_owner = env;
1206 static void osd_object_read_unlock(const struct lu_env *env,
1207 struct dt_object *dt)
1209 struct osd_object *obj = osd_dt_obj(dt);
1210 struct osd_thread_info *oti = osd_oti_get(env);
1212 LINVRNT(osd_invariant(obj));
1214 LASSERT(oti->oti_r_locks > 0);
1216 up_read(&obj->oo_sem);
1219 static void osd_object_write_unlock(const struct lu_env *env,
1220 struct dt_object *dt)
1222 struct osd_object *obj = osd_dt_obj(dt);
1223 struct osd_thread_info *oti = osd_oti_get(env);
1225 LINVRNT(osd_invariant(obj));
1227 LASSERT(obj->oo_owner == env);
1228 LASSERT(oti->oti_w_locks > 0);
1230 obj->oo_owner = NULL;
1231 up_write(&obj->oo_sem);
1234 static int osd_object_write_locked(const struct lu_env *env,
1235 struct dt_object *dt)
1237 struct osd_object *obj = osd_dt_obj(dt);
1239 LINVRNT(osd_invariant(obj));
1241 return obj->oo_owner == env;
1244 static int capa_is_sane(const struct lu_env *env,
1245 struct osd_device *dev,
1246 struct lustre_capa *capa,
1247 struct lustre_capa_key *keys)
1249 struct osd_thread_info *oti = osd_oti_get(env);
1250 struct lustre_capa *tcapa = &oti->oti_capa;
1251 struct obd_capa *oc;
1255 oc = capa_lookup(dev->od_capa_hash, capa, 0);
1257 if (capa_is_expired(oc)) {
1258 DEBUG_CAPA(D_ERROR, capa, "expired");
1265 if (capa_is_expired_sec(capa)) {
1266 DEBUG_CAPA(D_ERROR, capa, "expired");
1270 spin_lock(&capa_lock);
1271 for (i = 0; i < 2; i++) {
1272 if (keys[i].lk_keyid == capa->lc_keyid) {
1273 oti->oti_capa_key = keys[i];
1277 spin_unlock(&capa_lock);
1280 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1284 rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1288 if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1289 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1293 oc = capa_add(dev->od_capa_hash, capa);
1299 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1300 struct lustre_capa *capa, __u64 opc)
1302 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1303 struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1304 struct md_capainfo *ci;
1307 if (!dev->od_fl_capa)
1310 if (capa == BYPASS_CAPA)
1313 ci = md_capainfo(env);
1317 if (ci->mc_auth == LC_ID_NONE)
1321 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1325 if (!lu_fid_eq(fid, &capa->lc_fid)) {
1326 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1331 if (!capa_opc_supported(capa, opc)) {
1332 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1336 if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1337 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1344 static struct timespec *osd_inode_time(const struct lu_env *env,
1345 struct inode *inode, __u64 seconds)
1347 struct osd_thread_info *oti = osd_oti_get(env);
1348 struct timespec *t = &oti->oti_time;
1350 t->tv_sec = seconds;
1352 *t = timespec_trunc(*t, inode->i_sb->s_time_gran);
1357 static void osd_inode_getattr(const struct lu_env *env,
1358 struct inode *inode, struct lu_attr *attr)
1360 attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1361 LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1362 LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE |
1365 attr->la_atime = LTIME_S(inode->i_atime);
1366 attr->la_mtime = LTIME_S(inode->i_mtime);
1367 attr->la_ctime = LTIME_S(inode->i_ctime);
1368 attr->la_mode = inode->i_mode;
1369 attr->la_size = i_size_read(inode);
1370 attr->la_blocks = inode->i_blocks;
1371 attr->la_uid = inode->i_uid;
1372 attr->la_gid = inode->i_gid;
1373 attr->la_flags = LDISKFS_I(inode)->i_flags;
1374 attr->la_nlink = inode->i_nlink;
1375 attr->la_rdev = inode->i_rdev;
1376 attr->la_blksize = 1 << inode->i_blkbits;
1377 attr->la_blkbits = inode->i_blkbits;
1380 static int osd_attr_get(const struct lu_env *env,
1381 struct dt_object *dt,
1382 struct lu_attr *attr,
1383 struct lustre_capa *capa)
1385 struct osd_object *obj = osd_dt_obj(dt);
1387 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
1388 LINVRNT(osd_invariant(obj));
1390 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1393 spin_lock(&obj->oo_guard);
1394 osd_inode_getattr(env, obj->oo_inode, attr);
1395 spin_unlock(&obj->oo_guard);
1399 static int osd_declare_attr_set(const struct lu_env *env,
1400 struct dt_object *dt,
1401 const struct lu_attr *attr,
1402 struct thandle *handle)
1404 struct osd_thandle *oh;
1405 struct osd_object *obj;
1406 struct osd_thread_info *info = osd_oti_get(env);
1407 struct lquota_id_info *qi = &info->oti_qi;
1413 LASSERT(dt != NULL);
1414 LASSERT(handle != NULL);
1416 obj = osd_dt_obj(dt);
1417 LASSERT(osd_invariant(obj));
1419 oh = container_of0(handle, struct osd_thandle, ot_super);
1420 LASSERT(oh->ot_handle == NULL);
1422 osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET,
1423 osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
1425 if (attr == NULL || obj->oo_inode == NULL)
1428 bspace = obj->oo_inode->i_blocks;
1429 bspace <<= obj->oo_inode->i_sb->s_blocksize_bits;
1430 bspace = toqb(bspace);
1432 /* Changing ownership is always preformed by super user, it should not
1435 * We still need to call the osd_declare_qid() to calculate the journal
1436 * credits for updating quota accounting files and to trigger quota
1437 * space adjustment once the operation is completed.*/
1438 if ((attr->la_valid & LA_UID) != 0 &&
1439 attr->la_uid != obj->oo_inode->i_uid) {
1440 qi->lqi_type = USRQUOTA;
1442 /* inode accounting */
1443 qi->lqi_is_blk = false;
1445 /* one more inode for the new owner ... */
1446 qi->lqi_id.qid_uid = attr->la_uid;
1448 allocated = (attr->la_uid == 0) ? true : false;
1449 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1450 if (rc == -EDQUOT || rc == -EINPROGRESS)
1455 /* and one less inode for the current uid */
1456 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1458 rc = osd_declare_qid(env, oh, qi, true, NULL);
1459 if (rc == -EDQUOT || rc == -EINPROGRESS)
1464 /* block accounting */
1465 qi->lqi_is_blk = true;
1467 /* more blocks for the new owner ... */
1468 qi->lqi_id.qid_uid = attr->la_uid;
1469 qi->lqi_space = bspace;
1470 allocated = (attr->la_uid == 0) ? true : false;
1471 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1472 if (rc == -EDQUOT || rc == -EINPROGRESS)
1477 /* and finally less blocks for the current owner */
1478 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1479 qi->lqi_space = -bspace;
1480 rc = osd_declare_qid(env, oh, qi, true, NULL);
1481 if (rc == -EDQUOT || rc == -EINPROGRESS)
1487 if (attr->la_valid & LA_GID &&
1488 attr->la_gid != obj->oo_inode->i_gid) {
1489 qi->lqi_type = GRPQUOTA;
1491 /* inode accounting */
1492 qi->lqi_is_blk = false;
1494 /* one more inode for the new group owner ... */
1495 qi->lqi_id.qid_gid = attr->la_gid;
1497 allocated = (attr->la_gid == 0) ? true : false;
1498 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1499 if (rc == -EDQUOT || rc == -EINPROGRESS)
1504 /* and one less inode for the current gid */
1505 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1507 rc = osd_declare_qid(env, oh, qi, true, NULL);
1508 if (rc == -EDQUOT || rc == -EINPROGRESS)
1513 /* block accounting */
1514 qi->lqi_is_blk = true;
1516 /* more blocks for the new owner ... */
1517 qi->lqi_id.qid_gid = attr->la_gid;
1518 qi->lqi_space = bspace;
1519 allocated = (attr->la_gid == 0) ? true : false;
1520 rc = osd_declare_qid(env, oh, qi, allocated, NULL);
1521 if (rc == -EDQUOT || rc == -EINPROGRESS)
1526 /* and finally less blocks for the current owner */
1527 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1528 qi->lqi_space = -bspace;
1529 rc = osd_declare_qid(env, oh, qi, true, NULL);
1530 if (rc == -EDQUOT || rc == -EINPROGRESS)
1539 static int osd_inode_setattr(const struct lu_env *env,
1540 struct inode *inode, const struct lu_attr *attr)
1544 bits = attr->la_valid;
1546 if (bits & LA_ATIME)
1547 inode->i_atime = *osd_inode_time(env, inode, attr->la_atime);
1548 if (bits & LA_CTIME)
1549 inode->i_ctime = *osd_inode_time(env, inode, attr->la_ctime);
1550 if (bits & LA_MTIME)
1551 inode->i_mtime = *osd_inode_time(env, inode, attr->la_mtime);
1552 if (bits & LA_SIZE) {
1553 LDISKFS_I(inode)->i_disksize = attr->la_size;
1554 i_size_write(inode, attr->la_size);
1558 /* OSD should not change "i_blocks" which is used by quota.
1559 * "i_blocks" should be changed by ldiskfs only. */
1560 if (bits & LA_BLOCKS)
1561 inode->i_blocks = attr->la_blocks;
1564 inode->i_mode = (inode->i_mode & S_IFMT) |
1565 (attr->la_mode & ~S_IFMT);
1567 inode->i_uid = attr->la_uid;
1569 inode->i_gid = attr->la_gid;
1570 if (bits & LA_NLINK)
1571 set_nlink(inode, attr->la_nlink);
1573 inode->i_rdev = attr->la_rdev;
1575 if (bits & LA_FLAGS) {
1576 /* always keep S_NOCMTIME */
1577 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1583 static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
1585 if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
1586 (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
1591 if (attr->la_valid & LA_UID)
1592 iattr.ia_valid |= ATTR_UID;
1593 if (attr->la_valid & LA_GID)
1594 iattr.ia_valid |= ATTR_GID;
1595 iattr.ia_uid = attr->la_uid;
1596 iattr.ia_gid = attr->la_gid;
1598 rc = ll_vfs_dq_transfer(inode, &iattr);
1600 CERROR("%s: quota transfer failed: rc = %d. Is quota "
1601 "enforcement enabled on the ldiskfs filesystem?",
1602 inode->i_sb->s_id, rc);
1609 static int osd_attr_set(const struct lu_env *env,
1610 struct dt_object *dt,
1611 const struct lu_attr *attr,
1612 struct thandle *handle,
1613 struct lustre_capa *capa)
1615 struct osd_object *obj = osd_dt_obj(dt);
1616 struct inode *inode;
1619 LASSERT(handle != NULL);
1620 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
1621 LASSERT(osd_invariant(obj));
1623 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1626 osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET);
1628 if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FID_MAPPING)) {
1629 struct osd_thread_info *oti = osd_oti_get(env);
1630 const struct lu_fid *fid0 = lu_object_fid(&dt->do_lu);
1631 struct lu_fid *fid1 = &oti->oti_fid;
1632 struct osd_inode_id *id = &oti->oti_id;
1633 struct iam_path_descr *ipd;
1634 struct iam_container *bag;
1635 struct osd_thandle *oh;
1638 fid_cpu_to_be(fid1, fid0);
1639 memset(id, 1, sizeof(*id));
1640 bag = &osd_fid2oi(osd_dev(dt->do_lu.lo_dev),
1641 fid0)->oi_dir.od_container;
1642 ipd = osd_idx_ipd_get(env, bag);
1643 if (unlikely(ipd == NULL))
1646 oh = container_of0(handle, struct osd_thandle, ot_super);
1647 rc = iam_update(oh->ot_handle, bag, (const struct iam_key *)fid1,
1648 (const struct iam_rec *)id, ipd);
1649 osd_ipd_put(env, bag, ipd);
1650 return(rc > 0 ? 0 : rc);
1653 inode = obj->oo_inode;
1654 ll_vfs_dq_init(inode);
1656 rc = osd_quota_transfer(inode, attr);
1660 spin_lock(&obj->oo_guard);
1661 rc = osd_inode_setattr(env, inode, attr);
1662 spin_unlock(&obj->oo_guard);
1665 inode->i_sb->s_op->dirty_inode(inode);
1669 struct dentry *osd_child_dentry_get(const struct lu_env *env,
1670 struct osd_object *obj,
1671 const char *name, const int namelen)
1673 return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
1676 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1678 struct dt_allocation_hint *hint,
1682 struct osd_device *osd = osd_obj2dev(obj);
1683 struct osd_thandle *oth;
1684 struct dt_object *parent = NULL;
1685 struct inode *inode;
1687 LINVRNT(osd_invariant(obj));
1688 LASSERT(obj->oo_inode == NULL);
1689 LASSERT(obj->oo_hl_head == NULL);
1691 if (S_ISDIR(mode) && ldiskfs_pdo) {
1692 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1693 if (obj->oo_hl_head == NULL)
1697 oth = container_of(th, struct osd_thandle, ot_super);
1698 LASSERT(oth->ot_handle->h_transaction != NULL);
1700 if (hint && hint->dah_parent)
1701 parent = hint->dah_parent;
1703 inode = ldiskfs_create_inode(oth->ot_handle,
1704 parent ? osd_dt_obj(parent)->oo_inode :
1705 osd_sb(osd)->s_root->d_inode,
1707 if (!IS_ERR(inode)) {
1708 /* Do not update file c/mtime in ldiskfs.
1709 * NB: don't need any lock because no contention at this
1711 inode->i_flags |= S_NOCMTIME;
1713 /* For new created object, it must be consistent,
1714 * and it is unnecessary to scrub against it. */
1715 ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB);
1716 obj->oo_inode = inode;
1719 if (obj->oo_hl_head != NULL) {
1720 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1721 obj->oo_hl_head = NULL;
1723 result = PTR_ERR(inode);
1725 LINVRNT(osd_invariant(obj));
1733 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1734 struct lu_attr *attr,
1735 struct dt_allocation_hint *hint,
1736 struct dt_object_format *dof,
1740 struct osd_thandle *oth;
1741 __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1743 LASSERT(S_ISDIR(attr->la_mode));
1745 oth = container_of(th, struct osd_thandle, ot_super);
1746 LASSERT(oth->ot_handle->h_transaction != NULL);
1747 result = osd_mkfile(info, obj, mode, hint, th);
1752 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1753 struct lu_attr *attr,
1754 struct dt_allocation_hint *hint,
1755 struct dt_object_format *dof,
1759 struct osd_thandle *oth;
1760 const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1762 __u32 mode = (attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX));
1764 LASSERT(S_ISREG(attr->la_mode));
1766 oth = container_of(th, struct osd_thandle, ot_super);
1767 LASSERT(oth->ot_handle->h_transaction != NULL);
1769 result = osd_mkfile(info, obj, mode, hint, th);
1771 LASSERT(obj->oo_inode != NULL);
1772 if (feat->dif_flags & DT_IND_VARKEY)
1773 result = iam_lvar_create(obj->oo_inode,
1774 feat->dif_keysize_max,
1776 feat->dif_recsize_max,
1779 result = iam_lfix_create(obj->oo_inode,
1780 feat->dif_keysize_max,
1782 feat->dif_recsize_max,
1789 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1790 struct lu_attr *attr,
1791 struct dt_allocation_hint *hint,
1792 struct dt_object_format *dof,
1795 LASSERT(S_ISREG(attr->la_mode));
1796 return osd_mkfile(info, obj, (attr->la_mode &
1797 (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
1800 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1801 struct lu_attr *attr,
1802 struct dt_allocation_hint *hint,
1803 struct dt_object_format *dof,
1806 LASSERT(S_ISLNK(attr->la_mode));
1807 return osd_mkfile(info, obj, (attr->la_mode &
1808 (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
1811 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1812 struct lu_attr *attr,
1813 struct dt_allocation_hint *hint,
1814 struct dt_object_format *dof,
1817 cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX);
1820 LINVRNT(osd_invariant(obj));
1821 LASSERT(obj->oo_inode == NULL);
1822 LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1823 S_ISFIFO(mode) || S_ISSOCK(mode));
1825 result = osd_mkfile(info, obj, mode, hint, th);
1827 LASSERT(obj->oo_inode != NULL);
1829 * This inode should be marked dirty for i_rdev. Currently
1830 * that is done in the osd_attr_init().
1832 init_special_inode(obj->oo_inode, obj->oo_inode->i_mode,
1835 LINVRNT(osd_invariant(obj));
1839 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1841 struct dt_allocation_hint *hint,
1842 struct dt_object_format *dof,
1845 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1847 osd_obj_type_f result;
1863 result = osd_mk_index;
1874 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1875 struct dt_object *parent, struct dt_object *child,
1876 cfs_umode_t child_mode)
1880 memset(ah, 0, sizeof(*ah));
1881 ah->dah_parent = parent;
1882 ah->dah_mode = child_mode;
1885 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
1886 struct lu_attr *attr, struct dt_object_format *dof)
1888 struct inode *inode = obj->oo_inode;
1889 __u64 valid = attr->la_valid;
1892 attr->la_valid &= ~(LA_TYPE | LA_MODE);
1894 if (dof->dof_type != DFT_NODE)
1895 attr->la_valid &= ~LA_RDEV;
1896 if ((valid & LA_ATIME) && (attr->la_atime == LTIME_S(inode->i_atime)))
1897 attr->la_valid &= ~LA_ATIME;
1898 if ((valid & LA_CTIME) && (attr->la_ctime == LTIME_S(inode->i_ctime)))
1899 attr->la_valid &= ~LA_CTIME;
1900 if ((valid & LA_MTIME) && (attr->la_mtime == LTIME_S(inode->i_mtime)))
1901 attr->la_valid &= ~LA_MTIME;
1903 result = osd_quota_transfer(inode, attr);
1907 if (attr->la_valid != 0) {
1908 result = osd_inode_setattr(info->oti_env, inode, attr);
1910 * The osd_inode_setattr() should always succeed here. The
1911 * only error that could be returned is EDQUOT when we are
1912 * trying to change the UID or GID of the inode. However, this
1913 * should not happen since quota enforcement is no longer
1914 * enabled on ldiskfs (lquota takes care of it).
1916 LASSERTF(result == 0, "%d", result);
1917 inode->i_sb->s_op->dirty_inode(inode);
1920 attr->la_valid = valid;
1924 * Helper function for osd_object_create()
1926 * \retval 0, on success
1928 static int __osd_object_create(struct osd_thread_info *info,
1929 struct osd_object *obj, struct lu_attr *attr,
1930 struct dt_allocation_hint *hint,
1931 struct dt_object_format *dof,
1936 result = osd_create_type_f(dof->dof_type)(info, obj, attr, hint, dof,
1939 osd_attr_init(info, obj, attr, dof);
1940 osd_object_init0(obj);
1942 if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1943 unlock_new_inode(obj->oo_inode);
1950 * Helper function for osd_object_create()
1952 * \retval 0, on success
1954 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1955 const struct lu_fid *fid, struct thandle *th)
1957 struct osd_thread_info *info = osd_oti_get(env);
1958 struct osd_inode_id *id = &info->oti_id;
1959 struct osd_device *osd = osd_obj2dev(obj);
1961 LASSERT(obj->oo_inode != NULL);
1963 osd_id_gen(id, obj->oo_inode->i_ino, obj->oo_inode->i_generation);
1964 return osd_oi_insert(info, osd, fid, id, th);
1967 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
1968 const struct lu_fid *fid, struct lu_seq_range *range)
1970 struct seq_server_site *ss = osd_seq_site(osd);
1973 if (fid_is_idif(fid)) {
1974 range->lsr_flags = LU_SEQ_RANGE_OST;
1975 range->lsr_index = fid_idif_ost_idx(fid);
1979 if (!fid_seq_in_fldb(fid_seq(fid))) {
1980 range->lsr_flags = LU_SEQ_RANGE_MDT;
1982 /* FIXME: If ss is NULL, it suppose not get lsr_index
1984 range->lsr_index = ss->ss_node_id;
1988 LASSERT(ss != NULL);
1989 range->lsr_flags = -1;
1990 rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1992 CERROR("%s can not find "DFID": rc = %d\n",
1993 osd_name(osd), PFID(fid), rc);
1999 * Concurrency: no external locking is necessary.
2001 static int osd_declare_object_create(const struct lu_env *env,
2002 struct dt_object *dt,
2003 struct lu_attr *attr,
2004 struct dt_allocation_hint *hint,
2005 struct dt_object_format *dof,
2006 struct thandle *handle)
2008 struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
2009 struct osd_thandle *oh;
2013 LASSERT(handle != NULL);
2015 oh = container_of0(handle, struct osd_thandle, ot_super);
2016 LASSERT(oh->ot_handle == NULL);
2018 osd_trans_declare_op(env, oh, OSD_OT_CREATE,
2019 osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
2020 if (!fid_is_on_ost(osd_oti_get(env), osd_dt_dev(handle->th_dev),
2021 lu_object_fid(&dt->do_lu)))
2022 /* Reuse idle OI block may cause additional one OI block
2024 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2025 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
2027 /* If this is directory, then we expect . and .. to be inserted as
2028 * well. The one directory block always needs to be created for the
2029 * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
2030 * block), there is no danger of needing a tree for the first block.
2032 if (attr && S_ISDIR(attr->la_mode)) {
2033 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2034 osd_dto_credits_noquota[DTO_WRITE_BASE]);
2035 osd_trans_declare_op(env, oh, OSD_OT_INSERT, 0);
2041 rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
2042 false, false, NULL, false);
2046 /* It does fld look up inside declare, and the result will be
2047 * added to fld cache, so the following fld lookup inside insert
2048 * does not need send RPC anymore, so avoid send rpc with holding
2050 if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
2051 !fid_is_last_id(lu_object_fid(&dt->do_lu)))
2052 osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
2053 lu_object_fid(&dt->do_lu), range);
2059 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
2060 struct lu_attr *attr,
2061 struct dt_allocation_hint *hint,
2062 struct dt_object_format *dof,
2065 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2066 struct osd_object *obj = osd_dt_obj(dt);
2067 struct osd_thread_info *info = osd_oti_get(env);
2072 LINVRNT(osd_invariant(obj));
2073 LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
2074 LASSERT(osd_write_locked(env, obj));
2075 LASSERT(th != NULL);
2077 if (unlikely(fid_is_acct(fid)))
2078 /* Quota files can't be created from the kernel any more,
2079 * 'tune2fs -O quota' will take care of creating them */
2082 osd_trans_exec_op(env, th, OSD_OT_CREATE);
2083 osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2085 result = __osd_object_create(info, obj, attr, hint, dof, th);
2087 result = __osd_oi_insert(env, obj, fid, th);
2089 LASSERT(ergo(result == 0,
2090 dt_object_exists(dt) && !dt_object_remote(dt)));
2092 LASSERT(osd_invariant(obj));
2097 * Called to destroy on-disk representation of the object
2099 * Concurrency: must be locked
2101 static int osd_declare_object_destroy(const struct lu_env *env,
2102 struct dt_object *dt,
2105 struct osd_object *obj = osd_dt_obj(dt);
2106 struct inode *inode = obj->oo_inode;
2107 struct osd_thandle *oh;
2111 oh = container_of0(th, struct osd_thandle, ot_super);
2112 LASSERT(oh->ot_handle == NULL);
2115 osd_trans_declare_op(env, oh, OSD_OT_DELETE,
2116 osd_dto_credits_noquota[DTO_OBJECT_DELETE]);
2117 /* Recycle idle OI leaf may cause additional three OI blocks
2119 osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
2120 osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
2122 /* one less inode */
2123 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
2124 false, true, NULL, false);
2127 /* data to be truncated */
2128 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
2129 true, true, NULL, false);
2133 static int osd_object_destroy(const struct lu_env *env,
2134 struct dt_object *dt,
2137 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2138 struct osd_object *obj = osd_dt_obj(dt);
2139 struct inode *inode = obj->oo_inode;
2140 struct osd_device *osd = osd_obj2dev(obj);
2141 struct osd_thandle *oh;
2145 oh = container_of0(th, struct osd_thandle, ot_super);
2146 LASSERT(oh->ot_handle);
2148 LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
2150 if (unlikely(fid_is_acct(fid)))
2153 /* Parallel control for OI scrub. For most of cases, there is no
2154 * lock contention. So it will not affect unlink performance. */
2155 mutex_lock(&inode->i_mutex);
2156 if (S_ISDIR(inode->i_mode)) {
2157 LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1);
2158 /* it will check/delete the agent inode for every dir
2159 * destory, how to optimize it? unlink performance
2161 result = osd_delete_from_agent(env, osd, obj, oh);
2162 if (result != 0 && result != -ENOENT) {
2163 CERROR("%s: delete agent inode "DFID": rc = %d\n",
2164 osd_name(osd), PFID(fid), result);
2166 spin_lock(&obj->oo_guard);
2168 spin_unlock(&obj->oo_guard);
2169 inode->i_sb->s_op->dirty_inode(inode);
2172 osd_trans_exec_op(env, th, OSD_OT_DESTROY);
2174 result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
2175 mutex_unlock(&inode->i_mutex);
2177 /* XXX: add to ext3 orphan list */
2178 /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
2180 /* not needed in the cache anymore */
2181 set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
2186 static inline int __osd_xattr_set(struct osd_thread_info *info,
2187 struct inode *inode, const char *name,
2188 const void *buf, int buflen, int fl)
2190 struct dentry *dentry = &info->oti_child_dentry;
2192 ll_vfs_dq_init(inode);
2193 dentry->d_inode = inode;
2194 return inode->i_op->setxattr(dentry, name, buf, buflen, fl);
2198 * Put the fid into lustre_mdt_attrs, and then place the structure
2199 * inode's ea. This fid should not be altered during the life time
2202 * \retval +ve, on success
2203 * \retval -ve, on error
2205 * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
2207 int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
2208 const struct lu_fid *fid)
2210 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
2213 if (OBD_FAIL_CHECK(OBD_FAIL_FID_INLMA))
2216 if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF) && fid_is_client_visible(fid))
2219 lustre_lma_init(lma, fid);
2220 lustre_lma_swab(lma);
2222 rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma, sizeof(*lma),
2224 /* Someone may created the EA by race. */
2225 if (unlikely(rc == -EEXIST))
2231 * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
2232 * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
2233 * To have compatilibility with 1.8 ldiskfs driver we need to have
2234 * magic number at start of fid data.
2235 * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
2238 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
2239 const struct dt_rec *fid)
2241 if (!fid_is_client_mdt_visible((const struct lu_fid *)fid)) {
2242 param->edp_magic = 0;
2246 param->edp_magic = LDISKFS_LUFID_MAGIC;
2247 param->edp_len = sizeof(struct lu_fid) + 1;
2248 fid_cpu_to_be((struct lu_fid *)param->edp_data, (struct lu_fid *)fid);
2252 * Try to read the fid from inode ea into dt_rec.
2254 * \param fid object fid.
2256 * \retval 0 on success
2258 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
2259 __u32 ino, struct lu_fid *fid,
2260 struct osd_inode_id *id)
2262 struct osd_thread_info *info = osd_oti_get(env);
2263 struct inode *inode;
2266 osd_id_gen(id, ino, OSD_OII_NOGEN);
2267 inode = osd_iget_fid(info, osd_obj2dev(obj), id, fid);
2269 RETURN(PTR_ERR(inode));
2275 static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
2277 struct inode *parent_dir,
2278 const struct dt_rec *dot_fid,
2279 const struct dt_rec *dot_dot_fid,
2280 struct osd_thandle *oth)
2282 struct ldiskfs_dentry_param *dot_ldp;
2283 struct ldiskfs_dentry_param *dot_dot_ldp;
2285 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2286 osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2288 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2289 dot_ldp->edp_magic = 0;
2290 return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
2291 dir, dot_ldp, dot_dot_ldp);
2295 * Create an local inode for remote entry
2297 static struct inode *osd_create_remote_inode(const struct lu_env *env,
2298 struct osd_device *osd,
2299 struct osd_object *pobj,
2300 const struct lu_fid *fid,
2303 struct osd_thread_info *info = osd_oti_get(env);
2304 struct inode *local;
2305 struct osd_thandle *oh;
2310 oh = container_of(th, struct osd_thandle, ot_super);
2311 LASSERT(oh->ot_handle->h_transaction != NULL);
2313 /* FIXME: Insert index api needs to know the mode of
2314 * the remote object. Just use S_IFDIR for now */
2315 local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
2316 if (IS_ERR(local)) {
2317 CERROR("%s: create local error %d\n", osd_name(osd),
2318 (int)PTR_ERR(local));
2322 rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
2323 (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
2324 (const struct dt_rec *)fid, oh);
2326 CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
2327 osd_name(osd), PFID(fid), rc);
2328 RETURN(ERR_PTR(rc));
2335 * Delete local inode for remote entry
2337 static int osd_delete_remote_inode(const struct lu_env *env,
2338 struct osd_device *osd,
2339 const struct lu_fid *fid,
2340 __u32 ino, struct osd_thandle *oh)
2342 struct osd_thread_info *oti = osd_oti_get(env);
2343 struct osd_inode_id *id = &oti->oti_id;
2344 struct inode *inode;
2347 id->oii_ino = le32_to_cpu(ino);
2348 id->oii_gen = OSD_OII_NOGEN;
2349 inode = osd_iget(oti, osd, id);
2350 if (IS_ERR(inode)) {
2351 CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd),
2352 PFID(fid), id->oii_ino, id->oii_gen);
2353 RETURN(PTR_ERR(inode));
2357 mark_inode_dirty(inode);
2358 CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n",
2359 osd_name(osd), PFID(fid), inode->i_ino);
2365 * OSD layer object create function for interoperability mode (b11826).
2366 * This is mostly similar to osd_object_create(). Only difference being, fid is
2367 * inserted into inode ea here.
2369 * \retval 0, on success
2370 * \retval -ve, on error
2372 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
2373 struct lu_attr *attr,
2374 struct dt_allocation_hint *hint,
2375 struct dt_object_format *dof,
2378 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2379 struct osd_object *obj = osd_dt_obj(dt);
2380 struct osd_thread_info *info = osd_oti_get(env);
2385 LASSERT(osd_invariant(obj));
2386 LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
2387 LASSERT(osd_write_locked(env, obj));
2388 LASSERT(th != NULL);
2390 if (unlikely(fid_is_acct(fid)))
2391 /* Quota files can't be created from the kernel any more,
2392 * 'tune2fs -O quota' will take care of creating them */
2395 osd_trans_exec_op(env, th, OSD_OT_CREATE);
2396 osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2398 result = __osd_object_create(info, obj, attr, hint, dof, th);
2399 if ((result == 0) &&
2400 (fid_is_last_id(fid) ||
2401 !fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid)))
2402 result = osd_ea_fid_set(info, obj->oo_inode, fid);
2405 result = __osd_oi_insert(env, obj, fid, th);
2407 LASSERT(ergo(result == 0,
2408 dt_object_exists(dt) && !dt_object_remote(dt)));
2409 LINVRNT(osd_invariant(obj));
2413 static int osd_declare_object_ref_add(const struct lu_env *env,
2414 struct dt_object *dt,
2415 struct thandle *handle)
2417 struct osd_thandle *oh;
2419 /* it's possible that object doesn't exist yet */
2420 LASSERT(handle != NULL);
2422 oh = container_of0(handle, struct osd_thandle, ot_super);
2423 LASSERT(oh->ot_handle == NULL);
2425 osd_trans_declare_op(env, oh, OSD_OT_REF_ADD,
2426 osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2432 * Concurrency: @dt is write locked.
2434 static int osd_object_ref_add(const struct lu_env *env,
2435 struct dt_object *dt, struct thandle *th)
2437 struct osd_object *obj = osd_dt_obj(dt);
2438 struct inode *inode = obj->oo_inode;
2440 LINVRNT(osd_invariant(obj));
2441 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2442 LASSERT(osd_write_locked(env, obj));
2443 LASSERT(th != NULL);
2445 osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
2448 * DIR_NLINK feature is set for compatibility reasons if:
2449 * 1) nlinks > LDISKFS_LINK_MAX, or
2450 * 2) nlinks == 2, since this indicates i_nlink was previously 1.
2452 * It is easier to always set this flag (rather than check and set),
2453 * since it has less overhead, and the superblock will be dirtied
2454 * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
2455 * do not actually care whether this flag is set or not.
2457 spin_lock(&obj->oo_guard);
2458 /* inc_nlink from 0 may cause WARN_ON */
2459 if(inode->i_nlink == 0)
2460 set_nlink(inode, 1);
2463 if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
2464 if (inode->i_nlink >= LDISKFS_LINK_MAX ||
2465 inode->i_nlink == 2)
2466 set_nlink(inode, 1);
2468 LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
2469 spin_unlock(&obj->oo_guard);
2470 inode->i_sb->s_op->dirty_inode(inode);
2471 LINVRNT(osd_invariant(obj));
2476 static int osd_declare_object_ref_del(const struct lu_env *env,
2477 struct dt_object *dt,
2478 struct thandle *handle)
2480 struct osd_thandle *oh;
2482 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2483 LASSERT(handle != NULL);
2485 oh = container_of0(handle, struct osd_thandle, ot_super);
2486 LASSERT(oh->ot_handle == NULL);
2488 osd_trans_declare_op(env, oh, OSD_OT_REF_DEL,
2489 osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2495 * Concurrency: @dt is write locked.
2497 static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
2500 struct osd_object *obj = osd_dt_obj(dt);
2501 struct inode *inode = obj->oo_inode;
2503 LINVRNT(osd_invariant(obj));
2504 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2505 LASSERT(osd_write_locked(env, obj));
2506 LASSERT(th != NULL);
2508 osd_trans_exec_op(env, th, OSD_OT_REF_DEL);
2510 spin_lock(&obj->oo_guard);
2511 LASSERT(inode->i_nlink > 0);
2513 /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
2514 * then the nlink count is 1. Don't let it be set to 0 or the directory
2515 * inode will be deleted incorrectly. */
2516 if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
2517 set_nlink(inode, 1);
2518 spin_unlock(&obj->oo_guard);
2519 inode->i_sb->s_op->dirty_inode(inode);
2520 LINVRNT(osd_invariant(obj));
2526 * Get the 64-bit version for an inode.
2528 static int osd_object_version_get(const struct lu_env *env,
2529 struct dt_object *dt, dt_obj_version_t *ver)
2531 struct inode *inode = osd_dt_obj(dt)->oo_inode;
2533 CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2534 LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2535 *ver = LDISKFS_I(inode)->i_fs_version;
2540 * Concurrency: @dt is read locked.
2542 static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
2543 struct lu_buf *buf, const char *name,
2544 struct lustre_capa *capa)
2546 struct osd_object *obj = osd_dt_obj(dt);
2547 struct inode *inode = obj->oo_inode;
2548 struct osd_thread_info *info = osd_oti_get(env);
2549 struct dentry *dentry = &info->oti_obj_dentry;
2551 /* version get is not real XATTR but uses xattr API */
2552 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2553 /* for version we are just using xattr API but change inode
2555 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2556 osd_object_version_get(env, dt, buf->lb_buf);
2557 return sizeof(dt_obj_version_t);
2560 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2561 LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2563 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2566 return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
2570 static int osd_declare_xattr_set(const struct lu_env *env,
2571 struct dt_object *dt,
2572 const struct lu_buf *buf, const char *name,
2573 int fl, struct thandle *handle)
2575 struct osd_thandle *oh;
2577 LASSERT(handle != NULL);
2579 oh = container_of0(handle, struct osd_thandle, ot_super);
2580 LASSERT(oh->ot_handle == NULL);
2582 osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2583 strcmp(name, XATTR_NAME_VERSION) == 0 ?
2584 osd_dto_credits_noquota[DTO_ATTR_SET_BASE] :
2585 osd_dto_credits_noquota[DTO_XATTR_SET]);
2591 * Set the 64-bit version for object
2593 static void osd_object_version_set(const struct lu_env *env,
2594 struct dt_object *dt,
2595 dt_obj_version_t *new_version)
2597 struct inode *inode = osd_dt_obj(dt)->oo_inode;
2599 CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2600 *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2602 LDISKFS_I(inode)->i_fs_version = *new_version;
2603 /** Version is set after all inode operations are finished,
2604 * so we should mark it dirty here */
2605 inode->i_sb->s_op->dirty_inode(inode);
2609 * Concurrency: @dt is write locked.
2611 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2612 const struct lu_buf *buf, const char *name, int fl,
2613 struct thandle *handle, struct lustre_capa *capa)
2615 struct osd_object *obj = osd_dt_obj(dt);
2616 struct inode *inode = obj->oo_inode;
2617 struct osd_thread_info *info = osd_oti_get(env);
2620 LASSERT(handle != NULL);
2622 /* version set is not real XATTR */
2623 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2624 /* for version we are just using xattr API but change inode
2626 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2627 osd_object_version_set(env, dt, buf->lb_buf);
2628 return sizeof(dt_obj_version_t);
2631 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2634 osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2635 if (fl & LU_XATTR_REPLACE)
2636 fs_flags |= XATTR_REPLACE;
2638 if (fl & LU_XATTR_CREATE)
2639 fs_flags |= XATTR_CREATE;
2641 return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
2646 * Concurrency: @dt is read locked.
2648 static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
2649 struct lu_buf *buf, struct lustre_capa *capa)
2651 struct osd_object *obj = osd_dt_obj(dt);
2652 struct inode *inode = obj->oo_inode;
2653 struct osd_thread_info *info = osd_oti_get(env);
2654 struct dentry *dentry = &info->oti_obj_dentry;
2656 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2657 LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2658 LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2660 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2663 dentry->d_inode = inode;
2664 return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2667 static int osd_declare_xattr_del(const struct lu_env *env,
2668 struct dt_object *dt, const char *name,
2669 struct thandle *handle)
2671 struct osd_thandle *oh;
2673 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2674 LASSERT(handle != NULL);
2676 oh = container_of0(handle, struct osd_thandle, ot_super);
2677 LASSERT(oh->ot_handle == NULL);
2679 osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2680 osd_dto_credits_noquota[DTO_XATTR_SET]);
2686 * Concurrency: @dt is write locked.
2688 static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
2689 const char *name, struct thandle *handle,
2690 struct lustre_capa *capa)
2692 struct osd_object *obj = osd_dt_obj(dt);
2693 struct inode *inode = obj->oo_inode;
2694 struct osd_thread_info *info = osd_oti_get(env);
2695 struct dentry *dentry = &info->oti_obj_dentry;
2698 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2699 LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2700 LASSERT(handle != NULL);
2702 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2705 osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2707 ll_vfs_dq_init(inode);
2708 dentry->d_inode = inode;
2709 rc = inode->i_op->removexattr(dentry, name);
2713 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2714 struct dt_object *dt,
2715 struct lustre_capa *old,
2718 struct osd_thread_info *info = osd_oti_get(env);
2719 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2720 struct osd_object *obj = osd_dt_obj(dt);
2721 struct osd_device *dev = osd_obj2dev(obj);
2722 struct lustre_capa_key *key = &info->oti_capa_key;
2723 struct lustre_capa *capa = &info->oti_capa;
2724 struct obd_capa *oc;
2725 struct md_capainfo *ci;
2729 if (!dev->od_fl_capa)
2730 RETURN(ERR_PTR(-ENOENT));
2732 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2733 LINVRNT(osd_invariant(obj));
2735 /* renewal sanity check */
2736 if (old && osd_object_auth(env, dt, old, opc))
2737 RETURN(ERR_PTR(-EACCES));
2739 ci = md_capainfo(env);
2741 RETURN(ERR_PTR(-ENOENT));
2743 switch (ci->mc_auth) {
2747 capa->lc_uid = obj->oo_inode->i_uid;
2748 capa->lc_gid = obj->oo_inode->i_gid;
2749 capa->lc_flags = LC_ID_PLAIN;
2751 case LC_ID_CONVERT: {
2754 s[0] = obj->oo_inode->i_uid;
2755 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2756 s[2] = obj->oo_inode->i_gid;
2757 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2758 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2760 RETURN(ERR_PTR(rc));
2762 capa->lc_uid = ((__u64)d[1] << 32) | d[0];
2763 capa->lc_gid = ((__u64)d[3] << 32) | d[2];
2764 capa->lc_flags = LC_ID_CONVERT;
2768 RETURN(ERR_PTR(-EINVAL));
2771 capa->lc_fid = *fid;
2773 capa->lc_flags |= dev->od_capa_alg << 24;
2774 capa->lc_timeout = dev->od_capa_timeout;
2775 capa->lc_expiry = 0;
2777 oc = capa_lookup(dev->od_capa_hash, capa, 1);
2779 LASSERT(!capa_is_expired(oc));
2783 spin_lock(&capa_lock);
2784 *key = dev->od_capa_keys[1];
2785 spin_unlock(&capa_lock);
2787 capa->lc_keyid = key->lk_keyid;
2788 capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2790 rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2792 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2793 RETURN(ERR_PTR(rc));
2796 oc = capa_add(dev->od_capa_hash, capa);
2800 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2802 struct osd_object *obj = osd_dt_obj(dt);
2803 struct inode *inode = obj->oo_inode;
2804 struct osd_thread_info *info = osd_oti_get(env);
2805 struct dentry *dentry = &info->oti_obj_dentry;
2806 struct file *file = &info->oti_file;
2811 dentry->d_inode = inode;
2812 file->f_dentry = dentry;
2813 file->f_mapping = inode->i_mapping;
2814 file->f_op = inode->i_fop;
2815 mutex_lock(&inode->i_mutex);
2816 rc = file->f_op->fsync(file, dentry, 0);
2817 mutex_unlock(&inode->i_mutex);
2821 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2824 struct osd_object *obj = osd_dt_obj(dt);
2827 *data = (void *)obj->oo_inode;
2835 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2836 const struct dt_index_features *feat)
2838 struct iam_descr *descr;
2840 if (osd_object_is_root(o))
2841 return feat == &dt_directory_features;
2843 LASSERT(o->oo_dir != NULL);
2845 descr = o->oo_dir->od_container.ic_descr;
2846 if (feat == &dt_directory_features) {
2847 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2853 feat->dif_keysize_min <= descr->id_key_size &&
2854 descr->id_key_size <= feat->dif_keysize_max &&
2855 feat->dif_recsize_min <= descr->id_rec_size &&
2856 descr->id_rec_size <= feat->dif_recsize_max &&
2857 !(feat->dif_flags & (DT_IND_VARKEY |
2858 DT_IND_VARREC | DT_IND_NONUNQ)) &&
2859 ergo(feat->dif_flags & DT_IND_UPDATE,
2860 1 /* XXX check that object (and file system) is
2865 static int osd_iam_container_init(const struct lu_env *env,
2866 struct osd_object *obj,
2867 struct osd_directory *dir)
2869 struct iam_container *bag = &dir->od_container;
2872 result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2876 result = iam_container_setup(bag);
2878 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2880 iam_container_fini(bag);
2887 * Concurrency: no external locking is necessary.
2889 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2890 const struct dt_index_features *feat)
2894 struct osd_object *obj = osd_dt_obj(dt);
2896 LINVRNT(osd_invariant(obj));
2898 if (osd_object_is_root(obj)) {
2899 dt->do_index_ops = &osd_index_ea_ops;
2901 } else if (feat == &dt_directory_features) {
2902 dt->do_index_ops = &osd_index_ea_ops;
2903 if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode))
2908 } else if (unlikely(feat == &dt_otable_features)) {
2909 dt->do_index_ops = &osd_otable_ops;
2911 } else if (unlikely(feat == &dt_acct_features)) {
2912 dt->do_index_ops = &osd_acct_index_ops;
2915 } else if (!osd_has_index(obj)) {
2916 struct osd_directory *dir;
2921 spin_lock(&obj->oo_guard);
2922 if (obj->oo_dir == NULL)
2926 * Concurrent thread allocated container data.
2929 spin_unlock(&obj->oo_guard);
2931 * Now, that we have container data, serialize its
2934 down_write(&obj->oo_ext_idx_sem);
2936 * recheck under lock.
2938 if (!osd_has_index(obj))
2939 result = osd_iam_container_init(env, obj, dir);
2942 up_write(&obj->oo_ext_idx_sem);
2950 if (result == 0 && skip_iam == 0) {
2951 if (!osd_iam_index_probe(env, obj, feat))
2954 LINVRNT(osd_invariant(obj));
2956 if (is_quota_glb_feat(feat))
2957 result = osd_quota_migration(env, dt, feat);
2962 static int osd_otable_it_attr_get(const struct lu_env *env,
2963 struct dt_object *dt,
2964 struct lu_attr *attr,
2965 struct lustre_capa *capa)
2971 static const struct dt_object_operations osd_obj_ops = {
2972 .do_read_lock = osd_object_read_lock,
2973 .do_write_lock = osd_object_write_lock,
2974 .do_read_unlock = osd_object_read_unlock,
2975 .do_write_unlock = osd_object_write_unlock,
2976 .do_write_locked = osd_object_write_locked,
2977 .do_attr_get = osd_attr_get,
2978 .do_declare_attr_set = osd_declare_attr_set,
2979 .do_attr_set = osd_attr_set,
2980 .do_ah_init = osd_ah_init,
2981 .do_declare_create = osd_declare_object_create,
2982 .do_create = osd_object_create,
2983 .do_declare_destroy = osd_declare_object_destroy,
2984 .do_destroy = osd_object_destroy,
2985 .do_index_try = osd_index_try,
2986 .do_declare_ref_add = osd_declare_object_ref_add,
2987 .do_ref_add = osd_object_ref_add,
2988 .do_declare_ref_del = osd_declare_object_ref_del,
2989 .do_ref_del = osd_object_ref_del,
2990 .do_xattr_get = osd_xattr_get,
2991 .do_declare_xattr_set = osd_declare_xattr_set,
2992 .do_xattr_set = osd_xattr_set,
2993 .do_declare_xattr_del = osd_declare_xattr_del,
2994 .do_xattr_del = osd_xattr_del,
2995 .do_xattr_list = osd_xattr_list,
2996 .do_capa_get = osd_capa_get,
2997 .do_object_sync = osd_object_sync,
2998 .do_data_get = osd_data_get,
3002 * dt_object_operations for interoperability mode
3003 * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3005 static const struct dt_object_operations osd_obj_ea_ops = {
3006 .do_read_lock = osd_object_read_lock,
3007 .do_write_lock = osd_object_write_lock,
3008 .do_read_unlock = osd_object_read_unlock,
3009 .do_write_unlock = osd_object_write_unlock,
3010 .do_write_locked = osd_object_write_locked,
3011 .do_attr_get = osd_attr_get,
3012 .do_declare_attr_set = osd_declare_attr_set,
3013 .do_attr_set = osd_attr_set,
3014 .do_ah_init = osd_ah_init,
3015 .do_declare_create = osd_declare_object_create,
3016 .do_create = osd_object_ea_create,
3017 .do_declare_destroy = osd_declare_object_destroy,
3018 .do_destroy = osd_object_destroy,
3019 .do_index_try = osd_index_try,
3020 .do_declare_ref_add = osd_declare_object_ref_add,
3021 .do_ref_add = osd_object_ref_add,
3022 .do_declare_ref_del = osd_declare_object_ref_del,
3023 .do_ref_del = osd_object_ref_del,
3024 .do_xattr_get = osd_xattr_get,
3025 .do_declare_xattr_set = osd_declare_xattr_set,
3026 .do_xattr_set = osd_xattr_set,
3027 .do_declare_xattr_del = osd_declare_xattr_del,
3028 .do_xattr_del = osd_xattr_del,
3029 .do_xattr_list = osd_xattr_list,
3030 .do_capa_get = osd_capa_get,
3031 .do_object_sync = osd_object_sync,
3032 .do_data_get = osd_data_get,
3035 static const struct dt_object_operations osd_obj_otable_it_ops = {
3036 .do_attr_get = osd_otable_it_attr_get,
3037 .do_index_try = osd_index_try,
3040 static int osd_index_declare_iam_delete(const struct lu_env *env,
3041 struct dt_object *dt,
3042 const struct dt_key *key,
3043 struct thandle *handle)
3045 struct osd_thandle *oh;
3047 oh = container_of0(handle, struct osd_thandle, ot_super);
3048 LASSERT(oh->ot_handle == NULL);
3050 osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3051 osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3057 * delete a (key, value) pair from index \a dt specified by \a key
3059 * \param dt osd index object
3060 * \param key key for index
3061 * \param rec record reference
3062 * \param handle transaction handler
3065 * \retval -ve failure
3068 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
3069 const struct dt_key *key,
3070 struct thandle *handle,
3071 struct lustre_capa *capa)
3073 struct osd_thread_info *oti = osd_oti_get(env);
3074 struct osd_object *obj = osd_dt_obj(dt);
3075 struct osd_thandle *oh;
3076 struct iam_path_descr *ipd;
3077 struct iam_container *bag = &obj->oo_dir->od_container;
3082 LINVRNT(osd_invariant(obj));
3083 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3084 LASSERT(bag->ic_object == obj->oo_inode);
3085 LASSERT(handle != NULL);
3087 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3090 osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3092 ipd = osd_idx_ipd_get(env, bag);
3093 if (unlikely(ipd == NULL))
3096 oh = container_of0(handle, struct osd_thandle, ot_super);
3097 LASSERT(oh->ot_handle != NULL);
3098 LASSERT(oh->ot_handle->h_transaction != NULL);
3100 if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3101 /* swab quota uid/gid provided by caller */
3102 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3103 key = (const struct dt_key *)&oti->oti_quota_id;
3106 rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
3107 osd_ipd_put(env, bag, ipd);
3108 LINVRNT(osd_invariant(obj));
3112 static int osd_index_declare_ea_delete(const struct lu_env *env,
3113 struct dt_object *dt,
3114 const struct dt_key *key,
3115 struct thandle *handle)
3117 struct osd_thandle *oh;
3118 struct inode *inode;
3122 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3123 LASSERT(handle != NULL);
3125 oh = container_of0(handle, struct osd_thandle, ot_super);
3126 LASSERT(oh->ot_handle == NULL);
3128 osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3129 osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3131 inode = osd_dt_obj(dt)->oo_inode;
3134 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
3135 true, true, NULL, false);
3139 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
3142 struct osd_fid_pack *rec;
3145 if (de->file_type & LDISKFS_DIRENT_LUFID) {
3146 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
3147 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
3152 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
3155 struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
3156 struct seq_server_site *ss = osd_seq_site(osd);
3160 /* Those FID seqs, which are not in FLDB, must be local seq */
3161 if (unlikely(!fid_seq_in_fldb(fid_seq(fid)) || ss == NULL))
3164 rc = osd_fld_lookup(env, osd, fid, range);
3166 CERROR("%s: Can not lookup fld for "DFID"\n",
3167 osd_name(osd), PFID(fid));
3171 RETURN(ss->ss_node_id != range->lsr_index);
3175 * Index delete function for interoperability mode (b11826).
3176 * It will remove the directory entry added by osd_index_ea_insert().
3177 * This entry is needed to maintain name->fid mapping.
3179 * \param key, key i.e. file entry to be deleted
3181 * \retval 0, on success
3182 * \retval -ve, on error
3184 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
3185 const struct dt_key *key,
3186 struct thandle *handle,
3187 struct lustre_capa *capa)
3189 struct osd_object *obj = osd_dt_obj(dt);
3190 struct inode *dir = obj->oo_inode;
3191 struct dentry *dentry;
3192 struct osd_thandle *oh;
3193 struct ldiskfs_dir_entry_2 *de = NULL;
3194 struct buffer_head *bh;
3195 struct htree_lock *hlock = NULL;
3196 struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
3197 struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
3201 LINVRNT(osd_invariant(obj));
3202 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3203 LASSERT(handle != NULL);
3205 osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3207 oh = container_of(handle, struct osd_thandle, ot_super);
3208 LASSERT(oh->ot_handle != NULL);
3209 LASSERT(oh->ot_handle->h_transaction != NULL);
3211 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3214 ll_vfs_dq_init(dir);
3215 dentry = osd_child_dentry_get(env, obj,
3216 (char *)key, strlen((char *)key));
3218 if (obj->oo_hl_head != NULL) {
3219 hlock = osd_oti_get(env)->oti_hlock;
3220 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3221 dir, LDISKFS_HLOCK_DEL);
3223 down_write(&obj->oo_ext_idx_sem);
3226 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3228 rc = ldiskfs_delete_entry(oh->ot_handle,
3235 ldiskfs_htree_unlock(hlock);
3237 up_write(&obj->oo_ext_idx_sem);
3242 /* For inode on the remote MDT, .. will point to
3243 * /Agent directory. So do not try to lookup/delete
3244 * remote inode for .. */
3245 if (strcmp((char *)key, dotdot) == 0)
3248 LASSERT(de != NULL);
3249 rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
3250 if (rc == 0 && osd_remote_fid(env, osd, fid)) {
3251 __u32 ino = le32_to_cpu(de->inode);
3253 rc = osd_delete_remote_inode(env, osd, fid, ino, oh);
3255 CERROR("%s: del local inode "DFID": rc = %d\n",
3256 osd_name(osd), PFID(fid), rc);
3263 LASSERT(osd_invariant(obj));
3268 * Lookup index for \a key and copy record to \a rec.
3270 * \param dt osd index object
3271 * \param key key for index
3272 * \param rec record reference
3274 * \retval +ve success : exact mach
3275 * \retval 0 return record with key not greater than \a key
3276 * \retval -ve failure
3278 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
3279 struct dt_rec *rec, const struct dt_key *key,
3280 struct lustre_capa *capa)
3282 struct osd_object *obj = osd_dt_obj(dt);
3283 struct iam_path_descr *ipd;
3284 struct iam_container *bag = &obj->oo_dir->od_container;
3285 struct osd_thread_info *oti = osd_oti_get(env);
3286 struct iam_iterator *it = &oti->oti_idx_it;
3287 struct iam_rec *iam_rec;
3292 LASSERT(osd_invariant(obj));
3293 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3294 LASSERT(bag->ic_object == obj->oo_inode);
3296 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3299 ipd = osd_idx_ipd_get(env, bag);
3303 /* got ipd now we can start iterator. */
3304 iam_it_init(it, bag, 0, ipd);
3306 if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3307 /* swab quota uid/gid provided by caller */
3308 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3309 key = (const struct dt_key *)&oti->oti_quota_id;
3312 rc = iam_it_get(it, (struct iam_key *)key);
3314 if (S_ISDIR(obj->oo_inode->i_mode))
3315 iam_rec = (struct iam_rec *)oti->oti_ldp;
3317 iam_rec = (struct iam_rec *) rec;
3319 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
3321 if (S_ISDIR(obj->oo_inode->i_mode))
3322 osd_fid_unpack((struct lu_fid *) rec,
3323 (struct osd_fid_pack *)iam_rec);
3324 else if (fid_is_quota(lu_object_fid(&dt->do_lu)))
3325 osd_quota_unpack(obj, rec);
3330 osd_ipd_put(env, bag, ipd);
3332 LINVRNT(osd_invariant(obj));
3337 static int osd_index_declare_iam_insert(const struct lu_env *env,
3338 struct dt_object *dt,
3339 const struct dt_rec *rec,
3340 const struct dt_key *key,
3341 struct thandle *handle)
3343 struct osd_thandle *oh;
3345 LASSERT(handle != NULL);
3347 oh = container_of0(handle, struct osd_thandle, ot_super);
3348 LASSERT(oh->ot_handle == NULL);
3350 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3351 osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3357 * Inserts (key, value) pair in \a dt index object.
3359 * \param dt osd index object
3360 * \param key key for index
3361 * \param rec record reference
3362 * \param th transaction handler
3365 * \retval -ve failure
3367 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
3368 const struct dt_rec *rec,
3369 const struct dt_key *key, struct thandle *th,
3370 struct lustre_capa *capa, int ignore_quota)
3372 struct osd_object *obj = osd_dt_obj(dt);
3373 struct iam_path_descr *ipd;
3374 struct osd_thandle *oh;
3375 struct iam_container *bag = &obj->oo_dir->od_container;
3376 struct osd_thread_info *oti = osd_oti_get(env);
3377 struct iam_rec *iam_rec;
3382 LINVRNT(osd_invariant(obj));
3383 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3384 LASSERT(bag->ic_object == obj->oo_inode);
3385 LASSERT(th != NULL);
3387 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3390 osd_trans_exec_op(env, th, OSD_OT_INSERT);
3392 ipd = osd_idx_ipd_get(env, bag);
3393 if (unlikely(ipd == NULL))
3396 oh = container_of0(th, struct osd_thandle, ot_super);
3397 LASSERT(oh->ot_handle != NULL);
3398 LASSERT(oh->ot_handle->h_transaction != NULL);
3399 if (S_ISDIR(obj->oo_inode->i_mode)) {
3400 iam_rec = (struct iam_rec *)oti->oti_ldp;
3401 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
3402 } else if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3403 /* pack quota uid/gid */
3404 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3405 key = (const struct dt_key *)&oti->oti_quota_id;
3406 /* pack quota record */
3407 rec = osd_quota_pack(obj, rec, &oti->oti_quota_rec);
3408 iam_rec = (struct iam_rec *)rec;
3410 iam_rec = (struct iam_rec *)rec;
3413 rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
3415 osd_ipd_put(env, bag, ipd);
3416 LINVRNT(osd_invariant(obj));
3421 * Calls ldiskfs_add_entry() to add directory entry
3422 * into the directory. This is required for
3423 * interoperability mode (b11826)
3425 * \retval 0, on success
3426 * \retval -ve, on error
3428 static int __osd_ea_add_rec(struct osd_thread_info *info,
3429 struct osd_object *pobj, struct inode *cinode,
3430 const char *name, const struct dt_rec *fid,
3431 struct htree_lock *hlock, struct thandle *th)
3433 struct ldiskfs_dentry_param *ldp;
3434 struct dentry *child;
3435 struct osd_thandle *oth;
3438 oth = container_of(th, struct osd_thandle, ot_super);
3439 LASSERT(oth->ot_handle != NULL);
3440 LASSERT(oth->ot_handle->h_transaction != NULL);
3441 LASSERT(pobj->oo_inode);
3443 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3444 if (unlikely(pobj->oo_inode ==
3445 osd_sb(osd_obj2dev(pobj))->s_root->d_inode))
3448 osd_get_ldiskfs_dirent_param(ldp, fid);
3449 child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
3450 child->d_fsdata = (void *)ldp;
3451 ll_vfs_dq_init(pobj->oo_inode);
3452 rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
3458 * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
3459 * into the directory.Also sets flags into osd object to
3460 * indicate dot and dotdot are created. This is required for
3461 * interoperability mode (b11826)
3463 * \param dir directory for dot and dotdot fixup.
3464 * \param obj child object for linking
3466 * \retval 0, on success
3467 * \retval -ve, on error
3469 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3470 struct osd_object *dir,
3471 struct inode *parent_dir, const char *name,
3472 const struct dt_rec *dot_fid,
3473 const struct dt_rec *dot_dot_fid,
3476 struct inode *inode = dir->oo_inode;
3477 struct osd_thandle *oth;
3480 oth = container_of(th, struct osd_thandle, ot_super);
3481 LASSERT(oth->ot_handle->h_transaction != NULL);
3482 LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3484 if (strcmp(name, dot) == 0) {
3485 if (dir->oo_compat_dot_created) {
3488 LASSERT(inode == parent_dir);
3489 dir->oo_compat_dot_created = 1;
3492 } else if (strcmp(name, dotdot) == 0) {
3493 if (!dir->oo_compat_dot_created)
3495 /* in case of rename, dotdot is already created */
3496 if (dir->oo_compat_dotdot_created) {
3497 return __osd_ea_add_rec(info, dir, parent_dir, name,
3498 dot_dot_fid, NULL, th);
3501 result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
3502 parent_dir, dot_fid,
3505 dir->oo_compat_dotdot_created = 1;
3513 * It will call the appropriate osd_add* function and return the
3514 * value, return by respective functions.
3516 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
3517 struct inode *cinode, const char *name,
3518 const struct dt_rec *fid, struct thandle *th)
3520 struct osd_thread_info *info = osd_oti_get(env);
3521 struct htree_lock *hlock;
3524 hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3526 if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3528 if (hlock != NULL) {
3529 ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3532 down_write(&pobj->oo_ext_idx_sem);
3534 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3535 (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3538 if (hlock != NULL) {
3539 ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3540 pobj->oo_inode, LDISKFS_HLOCK_ADD);
3542 down_write(&pobj->oo_ext_idx_sem);
3545 if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR)) {
3546 struct lu_fid *tfid = &info->oti_fid;
3548 *tfid = *(const struct lu_fid *)fid;
3550 rc = __osd_ea_add_rec(info, pobj, cinode, name,
3551 (const struct dt_rec *)tfid,
3554 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3559 ldiskfs_htree_unlock(hlock);
3561 up_write(&pobj->oo_ext_idx_sem);
3567 osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
3568 struct osd_idmap_cache *oic)
3570 struct osd_scrub *scrub = &dev->od_scrub;
3571 struct lu_fid *fid = &oic->oic_fid;
3572 struct osd_inode_id *id = &oti->oti_id;
3577 if (!fid_is_norm(fid) && !fid_is_igif(fid))
3581 rc = osd_oi_lookup(oti, dev, fid, id, true);
3582 if (rc != 0 && rc != -ENOENT)
3585 if (rc == 0 && osd_id_eq(id, &oic->oic_lid))
3588 if (thread_is_running(&scrub->os_thread)) {
3589 rc = osd_oii_insert(dev, oic, rc == -ENOENT);
3590 /* There is race condition between osd_oi_lookup and OI scrub.
3591 * The OI scrub finished just after osd_oi_lookup() failure.
3592 * Under such case, it is unnecessary to trigger OI scrub again,
3593 * but try to call osd_oi_lookup() again. */
3594 if (unlikely(rc == -EAGAIN))
3600 if (!dev->od_noscrub && ++once == 1) {
3601 CDEBUG(D_LFSCK, "Trigger OI scrub by RPC for "DFID"\n",
3603 rc = osd_scrub_start(dev);
3604 LCONSOLE_ERROR("%.16s: trigger OI scrub by RPC for "DFID
3606 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
3615 static int osd_fail_fid_lookup(struct osd_thread_info *oti,
3616 struct osd_device *dev,
3617 struct osd_idmap_cache *oic,
3618 struct lu_fid *fid, __u32 ino)
3620 struct lustre_mdt_attrs *lma = &oti->oti_mdt_attrs;
3621 struct inode *inode;
3624 osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
3625 inode = osd_iget(oti, dev, &oic->oic_lid);
3626 if (IS_ERR(inode)) {
3627 fid_zero(&oic->oic_fid);
3628 return PTR_ERR(inode);
3631 rc = osd_get_lma(oti, inode, &oti->oti_obj_dentry, lma);
3634 fid_zero(&oic->oic_fid);
3636 *fid = oic->oic_fid = lma->lma_self_fid;
3641 * Calls ->lookup() to find dentry. From dentry get inode and
3642 * read inode's ea to get fid. This is required for interoperability
3645 * \retval 0, on success
3646 * \retval -ve, on error
3648 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3649 struct dt_rec *rec, const struct dt_key *key)
3651 struct inode *dir = obj->oo_inode;
3652 struct dentry *dentry;
3653 struct ldiskfs_dir_entry_2 *de;
3654 struct buffer_head *bh;
3655 struct lu_fid *fid = (struct lu_fid *) rec;
3656 struct htree_lock *hlock = NULL;
3661 LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3663 dentry = osd_child_dentry_get(env, obj,
3664 (char *)key, strlen((char *)key));
3666 if (obj->oo_hl_head != NULL) {
3667 hlock = osd_oti_get(env)->oti_hlock;
3668 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3669 dir, LDISKFS_HLOCK_LOOKUP);
3671 down_read(&obj->oo_ext_idx_sem);
3674 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3676 struct osd_thread_info *oti = osd_oti_get(env);
3677 struct osd_inode_id *id = &oti->oti_id;
3678 struct osd_idmap_cache *oic = &oti->oti_cache;
3679 struct osd_device *dev = osd_obj2dev(obj);
3680 struct osd_scrub *scrub = &dev->od_scrub;
3681 struct scrub_file *sf = &scrub->os_file;
3683 ino = le32_to_cpu(de->inode);
3684 if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP)) {
3686 rc = osd_fail_fid_lookup(oti, dev, oic, fid, ino);
3690 rc = osd_get_fid_from_dentry(de, rec);
3692 /* done with de, release bh */
3695 rc = osd_ea_fid_get(env, obj, ino, fid, id);
3697 osd_id_gen(id, ino, OSD_OII_NOGEN);
3698 if (rc != 0 || osd_remote_fid(env, dev, fid)) {
3699 fid_zero(&oic->oic_fid);
3704 oic->oic_fid = *fid;
3705 if ((scrub->os_pos_current <= ino) &&
3706 ((sf->sf_flags & SF_INCONSISTENT) ||
3707 (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
3708 ldiskfs_test_bit(osd_oi_fid2idx(dev, fid),
3710 osd_consistency_check(oti, dev, oic);
3719 ldiskfs_htree_unlock(hlock);
3721 up_read(&obj->oo_ext_idx_sem);
3726 * Find the osd object for given fid.
3728 * \param fid need to find the osd object having this fid
3730 * \retval osd_object on success
3731 * \retval -ve on error
3733 struct osd_object *osd_object_find(const struct lu_env *env,
3734 struct dt_object *dt,
3735 const struct lu_fid *fid)
3737 struct lu_device *ludev = dt->do_lu.lo_dev;
3738 struct osd_object *child = NULL;
3739 struct lu_object *luch;
3740 struct lu_object *lo;
3743 * at this point topdev might not exist yet
3744 * (i.e. MGS is preparing profiles). so we can
3745 * not rely on topdev and instead lookup with
3746 * our device passed as topdev. this can't work
3747 * if the object isn't cached yet (as osd doesn't
3748 * allocate lu_header). IOW, the object must be
3749 * in the cache, otherwise lu_object_alloc() crashes
3752 luch = lu_object_find_at(env, ludev, fid, NULL);
3753 if (!IS_ERR(luch)) {
3754 if (lu_object_exists(luch)) {
3755 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3757 child = osd_obj(lo);
3759 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3760 "lu_object can't be located"
3761 DFID"\n", PFID(fid));
3763 if (child == NULL) {
3764 lu_object_put(env, luch);
3765 CERROR("Unable to get osd_object\n");
3766 child = ERR_PTR(-ENOENT);
3769 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3770 "lu_object does not exists "DFID"\n",
3772 lu_object_put(env, luch);
3773 child = ERR_PTR(-ENOENT);
3776 child = (void *)luch;
3782 * Put the osd object once done with it.
3784 * \param obj osd object that needs to be put
3786 static inline void osd_object_put(const struct lu_env *env,
3787 struct osd_object *obj)
3789 lu_object_put(env, &obj->oo_dt.do_lu);
3792 static int osd_index_declare_ea_insert(const struct lu_env *env,
3793 struct dt_object *dt,
3794 const struct dt_rec *rec,
3795 const struct dt_key *key,
3796 struct thandle *handle)
3798 struct osd_thandle *oh;
3799 struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
3800 struct lu_fid *fid = (struct lu_fid *)rec;
3804 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3805 LASSERT(handle != NULL);
3807 oh = container_of0(handle, struct osd_thandle, ot_super);
3808 LASSERT(oh->ot_handle == NULL);
3810 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3811 osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3813 if (osd_dt_obj(dt)->oo_inode == NULL) {
3814 const char *name = (const char *)key;
3815 /* Object is not being created yet. Only happens when
3816 * 1. declare directory create
3817 * 2. declare insert .
3818 * 3. declare insert ..
3820 LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0);
3822 struct inode *inode = osd_dt_obj(dt)->oo_inode;
3824 /* We ignore block quota on meta pool (MDTs), so needn't
3825 * calculate how many blocks will be consumed by this index
3827 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0,
3828 oh, true, true, NULL, false);
3834 rc = osd_remote_fid(env, osd, fid);
3840 osd_trans_declare_op(env, oh, OSD_OT_CREATE,
3841 osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
3842 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3843 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
3844 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3845 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
3851 * Index add function for interoperability mode (b11826).
3852 * It will add the directory entry.This entry is needed to
3853 * maintain name->fid mapping.
3855 * \param key it is key i.e. file entry to be inserted
3856 * \param rec it is value of given key i.e. fid
3858 * \retval 0, on success
3859 * \retval -ve, on error
3861 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3862 const struct dt_rec *rec,
3863 const struct dt_key *key, struct thandle *th,
3864 struct lustre_capa *capa, int ignore_quota)
3866 struct osd_object *obj = osd_dt_obj(dt);
3867 struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
3868 struct lu_fid *fid = (struct lu_fid *) rec;
3869 const char *name = (const char *)key;
3870 struct osd_thread_info *oti = osd_oti_get(env);
3871 struct osd_inode_id *id = &oti->oti_id;
3872 struct inode *child_inode = NULL;
3873 struct osd_object *child = NULL;
3877 LASSERT(osd_invariant(obj));
3878 LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3879 LASSERT(th != NULL);
3881 osd_trans_exec_op(env, th, OSD_OT_INSERT);
3883 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3886 LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
3888 rc = osd_remote_fid(env, osd, fid);
3890 CERROR("%s: Can not find object "DFID" rc %d\n",
3891 osd_name(osd), PFID(fid), rc);
3896 /* Insert remote entry */
3897 if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
3898 struct osd_mdobj_map *omm = osd->od_mdt_map;
3899 struct osd_thandle *oh;
3901 /* If parent on remote MDT, we need put this object
3903 oh = container_of(th, typeof(*oh), ot_super);
3904 rc = osd_add_to_agent(env, osd, obj, oh);
3906 CERROR("%s: add agent "DFID" error: rc = %d\n",
3908 PFID(lu_object_fid(&dt->do_lu)), rc);
3912 child_inode = igrab(omm->omm_agent_dentry->d_inode);
3914 child_inode = osd_create_remote_inode(env, osd, obj,
3916 if (IS_ERR(child_inode))
3917 RETURN(PTR_ERR(child_inode));
3920 /* Insert local entry */
3921 child = osd_object_find(env, dt, fid);
3922 if (IS_ERR(child)) {
3923 CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n",
3924 osd_name(osd), PFID(fid),
3925 id->oii_ino, id->oii_gen,
3926 (int)PTR_ERR(child_inode));
3927 RETURN(PTR_ERR(child_inode));
3929 child_inode = igrab(child->oo_inode);
3932 rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th);
3936 osd_object_put(env, child);
3937 LASSERT(osd_invariant(obj));
3942 * Initialize osd Iterator for given osd index object.
3944 * \param dt osd index object
3947 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3948 struct dt_object *dt,
3950 struct lustre_capa *capa)
3952 struct osd_it_iam *it;
3953 struct osd_thread_info *oti = osd_oti_get(env);
3954 struct osd_object *obj = osd_dt_obj(dt);
3955 struct lu_object *lo = &dt->do_lu;
3956 struct iam_path_descr *ipd;
3957 struct iam_container *bag = &obj->oo_dir->od_container;
3959 LASSERT(lu_object_exists(lo));
3961 if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3962 return ERR_PTR(-EACCES);
3965 ipd = osd_it_ipd_get(env, bag);
3966 if (likely(ipd != NULL)) {
3970 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3971 return (struct dt_it *)it;
3973 return ERR_PTR(-ENOMEM);
3977 * free given Iterator.
3980 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3982 struct osd_it_iam *it = (struct osd_it_iam *)di;
3983 struct osd_object *obj = it->oi_obj;
3985 iam_it_fini(&it->oi_it);
3986 osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3987 lu_object_put(env, &obj->oo_dt.do_lu);
3991 * Move Iterator to record specified by \a key
3993 * \param di osd iterator
3994 * \param key key for index
3996 * \retval +ve di points to record with least key not larger than key
3997 * \retval 0 di points to exact matched key
3998 * \retval -ve failure
4001 static int osd_it_iam_get(const struct lu_env *env,
4002 struct dt_it *di, const struct dt_key *key)
4004 struct osd_thread_info *oti = osd_oti_get(env);
4005 struct osd_it_iam *it = (struct osd_it_iam *)di;
4007 if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
4008 /* swab quota uid/gid */
4009 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
4010 key = (struct dt_key *)&oti->oti_quota_id;
4013 return iam_it_get(&it->oi_it, (const struct iam_key *)key);
4019 * \param di osd iterator
4021 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
4023 struct osd_it_iam *it = (struct osd_it_iam *)di;
4025 iam_it_put(&it->oi_it);
4029 * Move iterator by one record
4031 * \param di osd iterator
4033 * \retval +1 end of container reached
4035 * \retval -ve failure
4038 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
4040 struct osd_it_iam *it = (struct osd_it_iam *)di;
4042 return iam_it_next(&it->oi_it);
4046 * Return pointer to the key under iterator.
4049 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
4050 const struct dt_it *di)
4052 struct osd_thread_info *oti = osd_oti_get(env);
4053 struct osd_it_iam *it = (struct osd_it_iam *)di;
4054 struct osd_object *obj = it->oi_obj;
4057 key = (struct dt_key *)iam_it_key_get(&it->oi_it);
4059 if (!IS_ERR(key) && fid_is_quota(lu_object_fid(&obj->oo_dt.do_lu))) {
4060 /* swab quota uid/gid */
4061 oti->oti_quota_id = le64_to_cpu(*((__u64 *)key));
4062 key = (struct dt_key *)&oti->oti_quota_id;
4069 * Return size of key under iterator (in bytes)
4072 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
4074 struct osd_it_iam *it = (struct osd_it_iam *)di;
4076 return iam_it_key_size(&it->oi_it);
4080 osd_it_append_attrs(struct lu_dirent *ent, int len, __u16 type)
4082 /* check if file type is required */
4083 if (ent->lde_attrs & LUDA_TYPE) {
4084 int align = sizeof(struct luda_type) - 1;
4085 struct luda_type *lt;
4087 len = (len + align) & ~align;
4088 lt = (struct luda_type *)(ent->lde_name + len);
4089 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
4092 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
4096 * build lu direct from backend fs dirent.
4100 osd_it_pack_dirent(struct lu_dirent *ent, struct lu_fid *fid, __u64 offset,
4101 char *name, __u16 namelen, __u16 type, __u32 attr)
4103 ent->lde_attrs = attr | LUDA_FID;
4104 fid_cpu_to_le(&ent->lde_fid, fid);
4106 ent->lde_hash = cpu_to_le64(offset);
4107 ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
4109 strncpy(ent->lde_name, name, namelen);
4110 ent->lde_namelen = cpu_to_le16(namelen);
4112 /* append lustre attributes */
4113 osd_it_append_attrs(ent, namelen, type);
4117 * Return pointer to the record under iterator.
4119 static int osd_it_iam_rec(const struct lu_env *env,
4120 const struct dt_it *di,
4121 struct dt_rec *dtrec, __u32 attr)
4123 struct osd_it_iam *it = (struct osd_it_iam *)di;
4124 struct osd_thread_info *info = osd_oti_get(env);
4127 if (S_ISDIR(it->oi_obj->oo_inode->i_mode)) {
4128 const struct osd_fid_pack *rec;
4129 struct lu_fid *fid = &info->oti_fid;
4130 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
4136 name = (char *)iam_it_key_get(&it->oi_it);
4138 RETURN(PTR_ERR(name));
4140 namelen = iam_it_key_size(&it->oi_it);
4142 rec = (const struct osd_fid_pack *)iam_it_rec_get(&it->oi_it);
4144 RETURN(PTR_ERR(rec));
4146 rc = osd_fid_unpack(fid, rec);
4150 hash = iam_it_store(&it->oi_it);
4152 /* IAM does not store object type in IAM index (dir) */
4153 osd_it_pack_dirent(lde, fid, hash, name, namelen,
4155 } else if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
4156 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
4157 (struct iam_rec *)dtrec);
4158 osd_quota_unpack(it->oi_obj, dtrec);
4160 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
4161 (struct iam_rec *)dtrec);
4168 * Returns cookie for current Iterator position.
4170 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
4172 struct osd_it_iam *it = (struct osd_it_iam *)di;
4174 return iam_it_store(&it->oi_it);
4178 * Restore iterator from cookie.
4180 * \param di osd iterator
4181 * \param hash Iterator location cookie
4183 * \retval +ve di points to record with least key not larger than key.
4184 * \retval 0 di points to exact matched key
4185 * \retval -ve failure
4188 static int osd_it_iam_load(const struct lu_env *env,
4189 const struct dt_it *di, __u64 hash)
4191 struct osd_it_iam *it = (struct osd_it_iam *)di;
4193 return iam_it_load(&it->oi_it, hash);
4196 static const struct dt_index_operations osd_index_iam_ops = {
4197 .dio_lookup = osd_index_iam_lookup,
4198 .dio_declare_insert = osd_index_declare_iam_insert,
4199 .dio_insert = osd_index_iam_insert,
4200 .dio_declare_delete = osd_index_declare_iam_delete,
4201 .dio_delete = osd_index_iam_delete,
4203 .init = osd_it_iam_init,
4204 .fini = osd_it_iam_fini,
4205 .get = osd_it_iam_get,
4206 .put = osd_it_iam_put,
4207 .next = osd_it_iam_next,
4208 .key = osd_it_iam_key,
4209 .key_size = osd_it_iam_key_size,
4210 .rec = osd_it_iam_rec,
4211 .store = osd_it_iam_store,
4212 .load = osd_it_iam_load
4218 * Creates or initializes iterator context.
4220 * \retval struct osd_it_ea, iterator structure on success
4223 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
4224 struct dt_object *dt,
4226 struct lustre_capa *capa)
4228 struct osd_object *obj = osd_dt_obj(dt);
4229 struct osd_thread_info *info = osd_oti_get(env);
4230 struct osd_it_ea *it = &info->oti_it_ea;
4231 struct lu_object *lo = &dt->do_lu;
4232 struct dentry *obj_dentry = &info->oti_it_dentry;
4234 LASSERT(lu_object_exists(lo));
4236 obj_dentry->d_inode = obj->oo_inode;
4237 obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
4238 obj_dentry->d_name.hash = 0;
4240 it->oie_rd_dirent = 0;
4241 it->oie_it_dirent = 0;
4242 it->oie_dirent = NULL;
4243 it->oie_buf = info->oti_it_ea_buf;
4245 it->oie_file.f_pos = 0;
4246 it->oie_file.f_dentry = obj_dentry;
4247 if (attr & LUDA_64BITHASH)
4248 it->oie_file.f_mode |= FMODE_64BITHASH;
4250 it->oie_file.f_mode |= FMODE_32BITHASH;
4251 it->oie_file.f_mapping = obj->oo_inode->i_mapping;
4252 it->oie_file.f_op = obj->oo_inode->i_fop;
4253 it->oie_file.private_data = NULL;
4255 RETURN((struct dt_it *) it);
4259 * Destroy or finishes iterator context.
4261 * \param di iterator structure to be destroyed
4263 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
4265 struct osd_it_ea *it = (struct osd_it_ea *)di;
4266 struct osd_object *obj = it->oie_obj;
4267 struct inode *inode = obj->oo_inode;
4270 it->oie_file.f_op->release(inode, &it->oie_file);
4271 lu_object_put(env, &obj->oo_dt.do_lu);
4276 * It position the iterator at given key, so that next lookup continues from
4277 * that key Or it is similar to dio_it->load() but based on a key,
4278 * rather than file position.
4280 * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
4283 * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
4285 static int osd_it_ea_get(const struct lu_env *env,
4286 struct dt_it *di, const struct dt_key *key)
4288 struct osd_it_ea *it = (struct osd_it_ea *)di;
4291 LASSERT(((const char *)key)[0] == '\0');
4292 it->oie_file.f_pos = 0;
4293 it->oie_rd_dirent = 0;
4294 it->oie_it_dirent = 0;
4295 it->oie_dirent = NULL;
4303 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
4308 * It is called internally by ->readdir(). It fills the
4309 * iterator's in-memory data structure with required
4310 * information i.e. name, namelen, rec_size etc.
4312 * \param buf in which information to be filled in.
4313 * \param name name of the file in given dir
4315 * \retval 0 on success
4316 * \retval 1 on buffer full
4318 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
4319 loff_t offset, __u64 ino,
4322 struct osd_it_ea *it = (struct osd_it_ea *)buf;
4323 struct osd_object *obj = it->oie_obj;
4324 struct osd_it_ea_dirent *ent = it->oie_dirent;
4325 struct lu_fid *fid = &ent->oied_fid;
4326 struct osd_fid_pack *rec;
4329 /* this should never happen */
4330 if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
4331 CERROR("ldiskfs return invalid namelen %d\n", namelen);
4335 if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
4339 /* "." is just the object itself. */
4340 if (namelen == 1 && name[0] == '.') {
4341 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4342 } else if (d_type & LDISKFS_DIRENT_LUFID) {
4343 rec = (struct osd_fid_pack*) (name + namelen + 1);
4344 if (osd_fid_unpack(fid, rec) != 0)
4349 d_type &= ~LDISKFS_DIRENT_LUFID;
4351 /* NOT export local root. */
4352 if (unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) {
4353 ino = obj->oo_inode->i_ino;
4354 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4357 ent->oied_ino = ino;
4358 ent->oied_off = offset;
4359 ent->oied_namelen = namelen;
4360 ent->oied_type = d_type;
4362 memcpy(ent->oied_name, name, namelen);
4364 it->oie_rd_dirent++;
4365 it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
4370 * Calls ->readdir() to load a directory entry at a time
4371 * and stored it in iterator's in-memory data structure.
4373 * \param di iterator's in memory structure
4375 * \retval 0 on success
4376 * \retval -ve on error
4378 static int osd_ldiskfs_it_fill(const struct lu_env *env,
4379 const struct dt_it *di)
4381 struct osd_it_ea *it = (struct osd_it_ea *)di;
4382 struct osd_object *obj = it->oie_obj;
4383 struct inode *inode = obj->oo_inode;
4384 struct htree_lock *hlock = NULL;
4388 it->oie_dirent = it->oie_buf;
4389 it->oie_rd_dirent = 0;
4391 if (obj->oo_hl_head != NULL) {
4392 hlock = osd_oti_get(env)->oti_hlock;
4393 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4394 inode, LDISKFS_HLOCK_READDIR);
4396 down_read(&obj->oo_ext_idx_sem);
4399 result = inode->i_fop->readdir(&it->oie_file, it,
4400 (filldir_t) osd_ldiskfs_filldir);
4403 ldiskfs_htree_unlock(hlock);
4405 up_read(&obj->oo_ext_idx_sem);
4407 if (it->oie_rd_dirent == 0) {
4410 it->oie_dirent = it->oie_buf;
4411 it->oie_it_dirent = 1;
4418 * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4419 * to load a directory entry at a time and stored it in
4420 * iterator's in-memory data structure.
4422 * \param di iterator's in memory structure
4424 * \retval +ve iterator reached to end
4425 * \retval 0 iterator not reached to end
4426 * \retval -ve on error
4428 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
4430 struct osd_it_ea *it = (struct osd_it_ea *)di;
4435 if (it->oie_it_dirent < it->oie_rd_dirent) {
4437 (void *) it->oie_dirent +
4438 cfs_size_round(sizeof(struct osd_it_ea_dirent) +
4439 it->oie_dirent->oied_namelen);
4440 it->oie_it_dirent++;
4443 if (it->oie_file.f_pos == ldiskfs_get_htree_eof(&it->oie_file))
4446 rc = osd_ldiskfs_it_fill(env, di);
4453 * Returns the key at current position from iterator's in memory structure.
4455 * \param di iterator's in memory structure
4457 * \retval key i.e. struct dt_key on success
4459 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
4460 const struct dt_it *di)
4462 struct osd_it_ea *it = (struct osd_it_ea *)di;
4464 return (struct dt_key *)it->oie_dirent->oied_name;
4468 * Returns the key's size at current position from iterator's in memory structure.
4470 * \param di iterator's in memory structure
4472 * \retval key_size i.e. struct dt_key on success
4474 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
4476 struct osd_it_ea *it = (struct osd_it_ea *)di;
4478 return it->oie_dirent->oied_namelen;
4482 osd_dirent_update(handle_t *jh, struct super_block *sb,
4483 struct osd_it_ea_dirent *ent, struct lu_fid *fid,
4484 struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de)
4486 struct osd_fid_pack *rec;
4490 LASSERT(de->file_type & LDISKFS_DIRENT_LUFID);
4491 LASSERT(de->rec_len >= de->name_len + sizeof(struct osd_fid_pack));
4493 rc = ldiskfs_journal_get_write_access(jh, bh);
4495 CERROR("%.16s: fail to write access for update dirent: "
4496 "name = %.*s, rc = %d\n",
4497 LDISKFS_SB(sb)->s_es->s_volume_name,
4498 ent->oied_namelen, ent->oied_name, rc);
4502 rec = (struct osd_fid_pack *)(de->name + de->name_len + 1);
4503 fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid);
4504 rc = ldiskfs_journal_dirty_metadata(jh, bh);
4506 CERROR("%.16s: fail to dirty metadata for update dirent: "
4507 "name = %.*s, rc = %d\n",
4508 LDISKFS_SB(sb)->s_es->s_volume_name,
4509 ent->oied_namelen, ent->oied_name, rc);
4515 osd_dirent_has_space(__u16 reclen, __u16 namelen, unsigned blocksize)
4517 if (ldiskfs_rec_len_from_disk(reclen, blocksize) >=
4518 __LDISKFS_DIR_REC_LEN(namelen + 1 + sizeof(struct osd_fid_pack)))
4525 osd_dirent_reinsert(const struct lu_env *env, handle_t *jh,
4526 struct inode *dir, struct inode *inode,
4527 struct osd_it_ea_dirent *ent, struct lu_fid *fid,
4528 struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de,
4529 struct htree_lock *hlock)
4531 struct dentry *dentry;
4532 struct osd_fid_pack *rec;
4533 struct ldiskfs_dentry_param *ldp;
4537 if (!LDISKFS_HAS_INCOMPAT_FEATURE(inode->i_sb,
4538 LDISKFS_FEATURE_INCOMPAT_DIRDATA))
4541 /* There is enough space to hold the FID-in-dirent. */
4542 if (osd_dirent_has_space(de->rec_len, ent->oied_namelen,
4543 dir->i_sb->s_blocksize)) {
4544 rc = ldiskfs_journal_get_write_access(jh, bh);
4546 CERROR("%.16s: fail to write access for reinsert "
4547 "dirent: name = %.*s, rc = %d\n",
4548 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4549 ent->oied_namelen, ent->oied_name, rc);
4553 de->name[de->name_len] = 0;
4554 rec = (struct osd_fid_pack *)(de->name + de->name_len + 1);
4555 rec->fp_len = sizeof(struct lu_fid) + 1;
4556 fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid);
4557 de->file_type |= LDISKFS_DIRENT_LUFID;
4559 rc = ldiskfs_journal_dirty_metadata(jh, bh);
4561 CERROR("%.16s: fail to dirty metadata for reinsert "
4562 "dirent: name = %.*s, rc = %d\n",
4563 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4564 ent->oied_namelen, ent->oied_name, rc);
4569 rc = ldiskfs_delete_entry(jh, dir, de, bh);
4571 CERROR("%.16s: fail to delete entry for reinsert dirent: "
4572 "name = %.*s, rc = %d\n",
4573 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4574 ent->oied_namelen, ent->oied_name, rc);
4578 dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name,
4580 ldp = (struct ldiskfs_dentry_param *)osd_oti_get(env)->oti_ldp;
4581 osd_get_ldiskfs_dirent_param(ldp, (const struct dt_rec *)fid);
4582 dentry->d_fsdata = (void *)ldp;
4583 ll_vfs_dq_init(dir);
4584 rc = osd_ldiskfs_add_entry(jh, dentry, inode, hlock);
4585 /* It is too bad, we cannot reinsert the name entry back.
4586 * That means we lose it! */
4588 CERROR("%.16s: fail to insert entry for reinsert dirent: "
4589 "name = %.*s, rc = %d\n",
4590 LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4591 ent->oied_namelen, ent->oied_name, rc);
4597 osd_dirent_check_repair(const struct lu_env *env, struct osd_object *obj,
4598 struct osd_it_ea *it, struct lu_fid *fid,
4599 struct osd_inode_id *id, __u32 *attr)
4601 struct osd_thread_info *info = osd_oti_get(env);
4602 struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
4603 struct osd_device *dev = osd_obj2dev(obj);
4604 struct super_block *sb = osd_sb(dev);
4605 const char *devname =
4606 LDISKFS_SB(sb)->s_es->s_volume_name;
4607 struct osd_it_ea_dirent *ent = it->oie_dirent;
4608 struct inode *dir = obj->oo_inode;
4609 struct htree_lock *hlock = NULL;
4610 struct buffer_head *bh = NULL;
4611 handle_t *jh = NULL;
4612 struct ldiskfs_dir_entry_2 *de;
4613 struct dentry *dentry;
4614 struct inode *inode;
4618 bool is_dotdot = false;
4621 if (ent->oied_name[0] == '.') {
4622 /* Skip dot entry, even if it has stale FID-in-dirent, because
4623 * we do not use such FID-in-dirent anymore, it is harmless. */
4624 if (ent->oied_namelen == 1)
4627 if (ent->oied_namelen == 2 && ent->oied_name[1] == '.')
4631 dentry = osd_child_dentry_get(env, obj, ent->oied_name,
4634 /* We need to ensure that the name entry is still valid.
4635 * Because it may be removed or renamed by other already.
4637 * The unlink or rename operation will start journal before PDO lock,
4638 * so to avoid deadlock, here we need to start journal handle before
4639 * related PDO lock also. But because we do not know whether there
4640 * will be something to be repaired before PDO lock, we just start
4641 * journal without conditions.
4643 * We may need to remove the name entry firstly, then insert back.
4644 * One credit is for user quota file update.
4645 * One credit is for group quota file update.
4646 * Two credits are for dirty inode. */
4647 credits = osd_dto_credits_noquota[DTO_INDEX_DELETE] +
4648 osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1 + 1 + 2;
4651 if (dev->od_dirent_journal) {
4652 jh = ldiskfs_journal_start_sb(sb, credits);
4655 CERROR("%.16s: fail to start trans for dirent "
4656 "check_repair: credits %d, name %.*s, rc %d\n",
4657 devname, credits, ent->oied_namelen,
4658 ent->oied_name, rc);
4663 if (obj->oo_hl_head != NULL) {
4664 hlock = osd_oti_get(env)->oti_hlock;
4665 ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir,
4668 down_write(&obj->oo_ext_idx_sem);
4671 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
4672 /* For dotdot entry, if there is not enough space to hold FID-in-dirent,
4673 * just keep it there. It only happens when the device upgraded from 1.8
4674 * or restored from MDT file-level backup. For the whole directory, only
4675 * dotdot entry has no FID-in-dirent and needs to get FID from LMA when
4676 * readdir, it will not affect the performance much. */
4677 if ((bh == NULL) || (le32_to_cpu(de->inode) != ent->oied_ino) ||
4678 (is_dotdot && !osd_dirent_has_space(de->rec_len,
4680 sb->s_blocksize))) {
4681 *attr |= LUDA_IGNORE;
4682 GOTO(out_journal, rc = 0);
4685 osd_id_gen(id, ent->oied_ino, OSD_OII_NOGEN);
4686 inode = osd_iget(info, dev, id);
4687 if (IS_ERR(inode)) {
4688 rc = PTR_ERR(inode);
4689 if (rc == -ENOENT || rc == -ESTALE) {
4690 *attr |= LUDA_IGNORE;
4694 GOTO(out_journal, rc);
4697 rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
4699 if (fid_is_sane(fid)) {
4700 /* FID-in-dirent is valid. */
4701 if (lu_fid_eq(fid, &lma->lma_self_fid))
4702 GOTO(out_inode, rc = 0);
4704 /* Do not repair under dryrun mode. */
4705 if (*attr & LUDA_VERIFY_DRYRUN) {
4706 *attr |= LUDA_REPAIR;
4707 GOTO(out_inode, rc = 0);
4710 if (!dev->od_dirent_journal) {
4714 ldiskfs_htree_unlock(hlock);
4716 up_write(&obj->oo_ext_idx_sem);
4717 dev->od_dirent_journal = 1;
4721 *fid = lma->lma_self_fid;
4723 /* Update the FID-in-dirent. */
4724 rc = osd_dirent_update(jh, sb, ent, fid, bh, de);
4726 *attr |= LUDA_REPAIR;
4728 /* Do not repair under dryrun mode. */
4729 if (*attr & LUDA_VERIFY_DRYRUN) {
4730 *attr |= LUDA_REPAIR;
4731 GOTO(out_inode, rc = 0);
4734 if (!dev->od_dirent_journal) {
4738 ldiskfs_htree_unlock(hlock);
4740 up_write(&obj->oo_ext_idx_sem);
4741 dev->od_dirent_journal = 1;
4745 *fid = lma->lma_self_fid;
4747 /* Append the FID-in-dirent. */
4748 rc = osd_dirent_reinsert(env, jh, dir, inode, ent,
4749 fid, bh, de, hlock);
4751 *attr |= LUDA_REPAIR;
4753 } else if (rc == -ENODATA) {
4754 /* Do not repair under dryrun mode. */
4755 if (*attr & LUDA_VERIFY_DRYRUN) {
4756 if (fid_is_sane(fid))
4757 *attr |= LUDA_REPAIR;
4759 *attr |= LUDA_UPGRADE;
4760 GOTO(out_inode, rc = 0);
4763 if (!dev->od_dirent_journal) {
4767 ldiskfs_htree_unlock(hlock);
4769 up_write(&obj->oo_ext_idx_sem);
4770 dev->od_dirent_journal = 1;
4775 if (unlikely(fid_is_sane(fid))) {
4776 /* FID-in-dirent exists, but FID-in-LMA is lost.
4777 * Trust the FID-in-dirent, and add FID-in-LMA. */
4778 rc = osd_ea_fid_set(info, inode, fid);
4780 *attr |= LUDA_REPAIR;
4782 lu_igif_build(fid, inode->i_ino, inode->i_generation);
4783 /* It is probably IGIF object. Only aappend the
4784 * FID-in-dirent. OI scrub will process FID-in-LMA. */
4785 rc = osd_dirent_reinsert(env, jh, dir, inode, ent,
4786 fid, bh, de, hlock);
4788 *attr |= LUDA_UPGRADE;
4792 GOTO(out_inode, rc);
4800 ldiskfs_htree_unlock(hlock);
4802 up_write(&obj->oo_ext_idx_sem);
4804 ldiskfs_journal_stop(jh);
4805 if (rc >= 0 && !dirty)
4806 dev->od_dirent_journal = 0;
4811 * Returns the value at current position from iterator's in memory structure.
4813 * \param di struct osd_it_ea, iterator's in memory structure
4814 * \param attr attr requested for dirent.
4815 * \param lde lustre dirent
4817 * \retval 0 no error and \param lde has correct lustre dirent.
4818 * \retval -ve on error
4820 static inline int osd_it_ea_rec(const struct lu_env *env,
4821 const struct dt_it *di,
4822 struct dt_rec *dtrec, __u32 attr)
4824 struct osd_it_ea *it = (struct osd_it_ea *)di;
4825 struct osd_object *obj = it->oie_obj;
4826 struct osd_device *dev = osd_obj2dev(obj);
4827 struct osd_scrub *scrub = &dev->od_scrub;
4828 struct scrub_file *sf = &scrub->os_file;
4829 struct osd_thread_info *oti = osd_oti_get(env);
4830 struct osd_inode_id *id = &oti->oti_id;
4831 struct osd_idmap_cache *oic = &oti->oti_cache;
4832 struct lu_fid *fid = &it->oie_dirent->oied_fid;
4833 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
4834 __u32 ino = it->oie_dirent->oied_ino;
4838 if (attr & LUDA_VERIFY) {
4840 if (unlikely(ino == osd_sb(dev)->s_root->d_inode->i_ino)) {
4841 attr |= LUDA_IGNORE;
4846 rc = osd_dirent_check_repair(env, obj, it, fid, id, &attr);
4848 attr &= ~LU_DIRENT_ATTRS_MASK;
4849 if (!fid_is_sane(fid)) {
4850 if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP))
4853 rc = osd_ea_fid_get(env, obj, ino, fid, id);
4855 osd_id_gen(id, ino, OSD_OII_NOGEN);
4863 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
4864 it->oie_dirent->oied_name,
4865 it->oie_dirent->oied_namelen,
4866 it->oie_dirent->oied_type, attr);
4868 if (osd_remote_fid(env, dev, fid))
4871 if (likely(!(attr & LUDA_IGNORE))) {
4873 oic->oic_fid = *fid;
4876 if (!(attr & LUDA_VERIFY) &&
4877 (scrub->os_pos_current <= ino) &&
4878 ((sf->sf_flags & SF_INCONSISTENT) ||
4879 (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
4880 ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
4881 osd_consistency_check(oti, dev, oic);
4887 * Returns a cookie for current position of the iterator head, so that
4888 * user can use this cookie to load/start the iterator next time.
4890 * \param di iterator's in memory structure
4892 * \retval cookie for current position, on success
4894 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
4896 struct osd_it_ea *it = (struct osd_it_ea *)di;
4898 return it->oie_dirent->oied_off;
4902 * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4903 * to load a directory entry at a time and stored it i inn,
4904 * in iterator's in-memory data structure.
4906 * \param di struct osd_it_ea, iterator's in memory structure
4908 * \retval +ve on success
4909 * \retval -ve on error
4911 static int osd_it_ea_load(const struct lu_env *env,
4912 const struct dt_it *di, __u64 hash)
4914 struct osd_it_ea *it = (struct osd_it_ea *)di;
4918 it->oie_file.f_pos = hash;
4920 rc = osd_ldiskfs_it_fill(env, di);
4928 * Index lookup function for interoperability mode (b11826).
4930 * \param key, key i.e. file name to be searched
4932 * \retval +ve, on success
4933 * \retval -ve, on error
4935 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
4936 struct dt_rec *rec, const struct dt_key *key,
4937 struct lustre_capa *capa)
4939 struct osd_object *obj = osd_dt_obj(dt);
4944 LASSERT(S_ISDIR(obj->oo_inode->i_mode));
4945 LINVRNT(osd_invariant(obj));
4947 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
4950 rc = osd_ea_lookup_rec(env, obj, rec, key);
4957 * Index and Iterator operations for interoperability
4958 * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
4960 static const struct dt_index_operations osd_index_ea_ops = {
4961 .dio_lookup = osd_index_ea_lookup,
4962 .dio_declare_insert = osd_index_declare_ea_insert,
4963 .dio_insert = osd_index_ea_insert,
4964 .dio_declare_delete = osd_index_declare_ea_delete,
4965 .dio_delete = osd_index_ea_delete,
4967 .init = osd_it_ea_init,
4968 .fini = osd_it_ea_fini,
4969 .get = osd_it_ea_get,
4970 .put = osd_it_ea_put,
4971 .next = osd_it_ea_next,
4972 .key = osd_it_ea_key,
4973 .key_size = osd_it_ea_key_size,
4974 .rec = osd_it_ea_rec,
4975 .store = osd_it_ea_store,
4976 .load = osd_it_ea_load
4980 static void *osd_key_init(const struct lu_context *ctx,
4981 struct lu_context_key *key)
4983 struct osd_thread_info *info;
4985 OBD_ALLOC_PTR(info);
4987 return ERR_PTR(-ENOMEM);
4989 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4990 if (info->oti_it_ea_buf == NULL)
4993 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
4995 info->oti_hlock = ldiskfs_htree_lock_alloc();
4996 if (info->oti_hlock == NULL)
5002 OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5005 return ERR_PTR(-ENOMEM);
5008 static void osd_key_fini(const struct lu_context *ctx,
5009 struct lu_context_key *key, void* data)
5011 struct osd_thread_info *info = data;
5013 if (info->oti_hlock != NULL)
5014 ldiskfs_htree_lock_free(info->oti_hlock);
5015 OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5019 static void osd_key_exit(const struct lu_context *ctx,
5020 struct lu_context_key *key, void *data)
5022 struct osd_thread_info *info = data;
5024 LASSERT(info->oti_r_locks == 0);
5025 LASSERT(info->oti_w_locks == 0);
5026 LASSERT(info->oti_txns == 0);
5029 /* type constructor/destructor: osd_type_init, osd_type_fini */
5030 LU_TYPE_INIT_FINI(osd, &osd_key);
5032 struct lu_context_key osd_key = {
5033 .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
5034 .lct_init = osd_key_init,
5035 .lct_fini = osd_key_fini,
5036 .lct_exit = osd_key_exit
5040 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
5041 const char *name, struct lu_device *next)
5043 struct osd_device *osd = osd_dev(d);
5045 strncpy(osd->od_svname, name, MAX_OBD_NAME);
5046 return osd_procfs_init(osd, name);
5049 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
5053 osd_scrub_cleanup(env, o);
5056 fsfilt_put_ops(o->od_fsops);
5060 /* shutdown quota slave instance associated with the device */
5061 if (o->od_quota_slave != NULL) {
5062 qsd_fini(env, o->od_quota_slave);
5063 o->od_quota_slave = NULL;
5069 static int osd_mount(const struct lu_env *env,
5070 struct osd_device *o, struct lustre_cfg *cfg)
5072 const char *name = lustre_cfg_string(cfg, 0);
5073 const char *dev = lustre_cfg_string(cfg, 1);
5075 unsigned long page, s_flags, lmd_flags = 0;
5076 struct page *__page;
5077 struct file_system_type *type;
5078 char *options = NULL;
5083 if (o->od_mnt != NULL)
5086 if (strlen(dev) >= sizeof(o->od_mntdev))
5088 strcpy(o->od_mntdev, dev);
5090 o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
5091 if (o->od_fsops == NULL) {
5092 CERROR("Can't find fsfilt_ldiskfs\n");
5096 OBD_PAGE_ALLOC(__page, CFS_ALLOC_STD);
5098 GOTO(out, rc = -ENOMEM);
5100 str = lustre_cfg_string(cfg, 2);
5101 s_flags = simple_strtoul(str, NULL, 0);
5102 str = strstr(str, ":");
5104 lmd_flags = simple_strtoul(str + 1, NULL, 0);
5105 opts = lustre_cfg_string(cfg, 3);
5106 page = (unsigned long)cfs_page_address(__page);
5107 options = (char *)page;
5110 strcat(options, "user_xattr,acl");
5112 strcat(options, opts);
5114 /* Glom up mount options */
5115 if (*options != '\0')
5116 strcat(options, ",");
5117 strlcat(options, "no_mbcache", CFS_PAGE_SIZE);
5119 type = get_fs_type("ldiskfs");
5121 CERROR("%s: cannot find ldiskfs module\n", name);
5122 GOTO(out, rc = -ENODEV);
5125 o->od_mnt = vfs_kern_mount(type, s_flags, dev, options);
5126 cfs_module_put(type->owner);
5128 if (IS_ERR(o->od_mnt)) {
5129 rc = PTR_ERR(o->od_mnt);
5130 CERROR("%s: can't mount %s: %d\n", name, dev, rc);
5135 if (lvfs_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) {
5136 CERROR("%s: underlying device %s is marked as read-only. "
5137 "Setup failed\n", name, dev);
5140 GOTO(out, rc = -EROFS);
5143 if (!LDISKFS_HAS_COMPAT_FEATURE(o->od_mnt->mnt_sb,
5144 LDISKFS_FEATURE_COMPAT_HAS_JOURNAL)) {
5145 CERROR("%s: device %s is mounted w/o journal\n", name, dev);
5148 GOTO(out, rc = -EINVAL);
5151 ldiskfs_set_inode_state(osd_sb(o)->s_root->d_inode,
5152 LDISKFS_STATE_LUSTRE_NO_OI);
5153 if (lmd_flags & LMD_FLG_NOSCRUB)
5158 OBD_PAGE_FREE(__page);
5160 fsfilt_put_ops(o->od_fsops);
5165 static struct lu_device *osd_device_fini(const struct lu_env *env,
5166 struct lu_device *d)
5171 rc = osd_shutdown(env, osd_dev(d));
5173 osd_obj_map_fini(osd_dev(d));
5175 shrink_dcache_sb(osd_sb(osd_dev(d)));
5176 osd_sync(env, lu2dt_dev(d));
5178 rc = osd_procfs_fini(osd_dev(d));
5180 CERROR("proc fini error %d \n", rc);
5181 RETURN (ERR_PTR(rc));
5184 if (osd_dev(d)->od_mnt) {
5185 mntput(osd_dev(d)->od_mnt);
5186 osd_dev(d)->od_mnt = NULL;
5192 static int osd_device_init0(const struct lu_env *env,
5193 struct osd_device *o,
5194 struct lustre_cfg *cfg)
5196 struct lu_device *l = osd2lu_dev(o);
5197 struct osd_thread_info *info;
5200 /* if the module was re-loaded, env can loose its keys */
5201 rc = lu_env_refill((struct lu_env *) env);
5204 info = osd_oti_get(env);
5207 l->ld_ops = &osd_lu_ops;
5208 o->od_dt_dev.dd_ops = &osd_dt_ops;
5210 spin_lock_init(&o->od_osfs_lock);
5211 mutex_init(&o->od_otable_mutex);
5212 o->od_osfs_age = cfs_time_shift_64(-1000);
5214 o->od_capa_hash = init_capa_hash();
5215 if (o->od_capa_hash == NULL)
5216 GOTO(out, rc = -ENOMEM);
5218 o->od_read_cache = 1;
5219 o->od_writethrough_cache = 1;
5220 o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
5222 rc = osd_mount(env, o, cfg);
5226 CFS_INIT_LIST_HEAD(&o->od_ios_list);
5227 /* setup scrub, including OI files initialization */
5228 rc = osd_scrub_setup(env, o);
5232 strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
5233 sizeof(o->od_svname) - 1);
5235 rc = osd_obj_map_init(o);
5237 GOTO(out_scrub, rc);
5239 rc = lu_site_init(&o->od_site, l);
5241 GOTO(out_compat, rc);
5242 o->od_site.ls_bottom_dev = l;
5244 rc = lu_site_init_finish(&o->od_site);
5248 rc = osd_procfs_init(o, o->od_svname);
5250 CERROR("%s: can't initialize procfs: rc = %d\n",
5255 LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev);
5257 /* initialize quota slave instance */
5258 o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
5260 if (IS_ERR(o->od_quota_slave)) {
5261 rc = PTR_ERR(o->od_quota_slave);
5262 o->od_quota_slave = NULL;
5263 GOTO(out_procfs, rc);
5270 lu_site_fini(&o->od_site);
5272 osd_obj_map_fini(o);
5274 osd_scrub_cleanup(env, o);
5276 osd_oi_fini(info, o);
5277 osd_shutdown(env, o);
5281 cleanup_capa_hash(o->od_capa_hash);
5286 static struct lu_device *osd_device_alloc(const struct lu_env *env,
5287 struct lu_device_type *t,
5288 struct lustre_cfg *cfg)
5290 struct osd_device *o;
5295 return ERR_PTR(-ENOMEM);
5297 rc = dt_device_init(&o->od_dt_dev, t);
5299 /* Because the ctx might be revived in dt_device_init,
5300 * refill the env here */
5301 lu_env_refill((struct lu_env *)env);
5302 rc = osd_device_init0(env, o, cfg);
5304 dt_device_fini(&o->od_dt_dev);
5307 if (unlikely(rc != 0))
5310 return rc == 0 ? osd2lu_dev(o) : ERR_PTR(rc);
5313 static struct lu_device *osd_device_free(const struct lu_env *env,
5314 struct lu_device *d)
5316 struct osd_device *o = osd_dev(d);
5319 cleanup_capa_hash(o->od_capa_hash);
5320 /* XXX: make osd top device in order to release reference */
5321 d->ld_site->ls_top_dev = d;
5322 lu_site_purge(env, d->ld_site, -1);
5323 if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
5324 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
5325 lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
5327 lu_site_fini(&o->od_site);
5328 dt_device_fini(&o->od_dt_dev);
5333 static int osd_process_config(const struct lu_env *env,
5334 struct lu_device *d, struct lustre_cfg *cfg)
5336 struct osd_device *o = osd_dev(d);
5340 switch(cfg->lcfg_command) {
5342 err = osd_mount(env, o, cfg);
5345 lu_dev_del_linkage(d->ld_site, d);
5346 err = osd_shutdown(env, o);
5355 static int osd_recovery_complete(const struct lu_env *env,
5356 struct lu_device *d)
5358 struct osd_device *osd = osd_dev(d);
5362 if (osd->od_quota_slave == NULL)
5365 /* start qsd instance on recovery completion, this notifies the quota
5366 * slave code that we are about to process new requests now */
5367 rc = qsd_start(env, osd->od_quota_slave);
5372 * we use exports to track all osd users
5374 static int osd_obd_connect(const struct lu_env *env, struct obd_export **exp,
5375 struct obd_device *obd, struct obd_uuid *cluuid,
5376 struct obd_connect_data *data, void *localdata)
5378 struct osd_device *osd = osd_dev(obd->obd_lu_dev);
5379 struct lustre_handle conn;
5383 CDEBUG(D_CONFIG, "connect #%d\n", osd->od_connects);
5385 rc = class_connect(&conn, obd, cluuid);
5389 *exp = class_conn2export(&conn);
5391 spin_lock(&osd->od_osfs_lock);
5393 spin_unlock(&osd->od_osfs_lock);
5399 * once last export (we don't count self-export) disappeared
5400 * osd can be released
5402 static int osd_obd_disconnect(struct obd_export *exp)
5404 struct obd_device *obd = exp->exp_obd;
5405 struct osd_device *osd = osd_dev(obd->obd_lu_dev);
5406 int rc, release = 0;
5409 /* Only disconnect the underlying layers on the final disconnect. */
5410 spin_lock(&osd->od_osfs_lock);
5412 if (osd->od_connects == 0)
5414 spin_unlock(&osd->od_osfs_lock);
5416 rc = class_disconnect(exp); /* bz 9811 */
5418 if (rc == 0 && release)
5419 class_manual_cleanup(obd);
5423 static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
5424 struct lu_device *dev)
5426 struct osd_device *osd = osd_dev(dev);
5430 if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) {
5431 /* MDT/MDD still use old infrastructure to create
5433 result = llo_local_objects_setup(env, lu2md_dev(pdev),
5439 if (osd->od_quota_slave != NULL)
5440 /* set up quota slave objects */
5441 result = qsd_prepare(env, osd->od_quota_slave);
5446 static const struct lu_object_operations osd_lu_obj_ops = {
5447 .loo_object_init = osd_object_init,
5448 .loo_object_delete = osd_object_delete,
5449 .loo_object_release = osd_object_release,
5450 .loo_object_free = osd_object_free,
5451 .loo_object_print = osd_object_print,
5452 .loo_object_invariant = osd_object_invariant
5455 const struct lu_device_operations osd_lu_ops = {
5456 .ldo_object_alloc = osd_object_alloc,
5457 .ldo_process_config = osd_process_config,
5458 .ldo_recovery_complete = osd_recovery_complete,
5459 .ldo_prepare = osd_prepare,
5462 static const struct lu_device_type_operations osd_device_type_ops = {
5463 .ldto_init = osd_type_init,
5464 .ldto_fini = osd_type_fini,
5466 .ldto_start = osd_type_start,
5467 .ldto_stop = osd_type_stop,
5469 .ldto_device_alloc = osd_device_alloc,
5470 .ldto_device_free = osd_device_free,
5472 .ldto_device_init = osd_device_init,
5473 .ldto_device_fini = osd_device_fini
5476 struct lu_device_type osd_device_type = {
5477 .ldt_tags = LU_DEVICE_DT,
5478 .ldt_name = LUSTRE_OSD_LDISKFS_NAME,
5479 .ldt_ops = &osd_device_type_ops,
5480 .ldt_ctx_tags = LCT_LOCAL,
5484 * lprocfs legacy support.
5486 static struct obd_ops osd_obd_device_ops = {
5487 .o_owner = THIS_MODULE,
5488 .o_connect = osd_obd_connect,
5489 .o_disconnect = osd_obd_disconnect
5492 static int __init osd_mod_init(void)
5494 struct lprocfs_static_vars lvars;
5497 lprocfs_osd_init_vars(&lvars);
5498 return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
5499 LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
5502 static void __exit osd_mod_exit(void)
5504 class_unregister_type(LUSTRE_OSD_LDISKFS_NAME);
5507 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
5508 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_LDISKFS_NAME")");
5509 MODULE_LICENSE("GPL");
5511 cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);