1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * lustre/osd/osd_handler.c
40 * Top-level entry points into osd module
42 * Author: Nikita Danilov <nikita@clusterfs.com>
43 * Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
47 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <linux/module.h>
53 /* LUSTRE_VERSION_CODE */
54 #include <lustre_ver.h>
55 /* prerequisite for linux/xattr.h */
56 #include <linux/types.h>
57 /* prerequisite for linux/xattr.h */
59 /* XATTR_{REPLACE,CREATE} */
60 #include <linux/xattr.h>
65 * struct OBD_{ALLOC,FREE}*()
68 #include <obd_support.h>
69 /* struct ptlrpc_thread */
70 #include <lustre_net.h>
73 #include <lustre_fid.h>
75 #include "osd_internal.h"
78 /* llo_* api support */
79 #include <md_object.h>
81 #ifdef HAVE_LDISKFS_PDO
83 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
84 "ldiskfs with parallel directory operations");
89 static const char dot[] = ".";
90 static const char dotdot[] = "..";
91 static const char remote_obj_dir[] = "REM_OBJ_DIR";
93 struct osd_directory {
94 struct iam_container od_container;
95 struct iam_descr od_descr;
99 struct dt_object oo_dt;
101 * Inode for file system object represented by this osd_object. This
102 * inode is pinned for the whole duration of lu_object life.
104 * Not modified concurrently (either setup early during object
105 * creation, or assigned by osd_object_create() under write lock).
107 struct inode *oo_inode;
109 * to protect index ops.
111 struct htree_lock_head *oo_hl_head;
112 cfs_rw_semaphore_t oo_ext_idx_sem;
113 cfs_rw_semaphore_t oo_sem;
114 struct osd_directory *oo_dir;
115 /** protects inode attributes. */
116 cfs_spinlock_t oo_guard;
118 * Following two members are used to indicate the presence of dot and
119 * dotdot in the given directory. This is required for interop mode
122 int oo_compat_dot_created;
123 int oo_compat_dotdot_created;
125 const struct lu_env *oo_owner;
126 #ifdef CONFIG_LOCKDEP
127 struct lockdep_map oo_dep_map;
131 static const struct lu_object_operations osd_lu_obj_ops;
132 static const struct lu_device_operations osd_lu_ops;
133 static struct lu_context_key osd_key;
134 static const struct dt_object_operations osd_obj_ops;
135 static const struct dt_object_operations osd_obj_ea_ops;
136 static const struct dt_body_operations osd_body_ops;
137 static const struct dt_body_operations osd_body_ops_new;
138 static const struct dt_index_operations osd_index_iam_ops;
139 static const struct dt_index_operations osd_index_ea_ops;
141 #define OSD_TRACK_DECLARES
142 #ifdef OSD_TRACK_DECLARES
143 #define OSD_DECLARE_OP(oh, op) { \
144 LASSERT(oh->ot_handle == NULL); \
145 ((oh)->ot_declare_ ##op)++; }
146 #define OSD_EXEC_OP(handle, op) { \
147 struct osd_thandle *oh; \
148 oh = container_of0(handle, struct osd_thandle, ot_super);\
149 if (((oh)->ot_declare_ ##op) > 0) { \
150 ((oh)->ot_declare_ ##op)--; \
154 #define OSD_DECLARE_OP(oh, op)
155 #define OSD_EXEC_OP(oh, op)
158 /* There are at most 10 uid/gids are affected in a transaction, and
159 * that's rename case:
160 * - 2 for source parent uid & gid;
161 * - 2 for source child uid & gid ('..' entry update when the child
163 * - 2 for target parent uid & gid;
164 * - 2 for target child uid & gid (if the target child exists);
165 * - 2 for root uid & gid (last_rcvd, llog, etc);
167 * The 0 to (OSD_MAX_UGID_CNT - 1) bits of ot_id_type is for indicating
168 * the id type of each id in the ot_id_array.
170 #define OSD_MAX_UGID_CNT 10
173 struct thandle ot_super;
175 struct journal_callback ot_jcb;
176 cfs_list_t ot_dcb_list;
177 /* Link to the device, for debugging. */
178 struct lu_ref_link *ot_dev_link;
179 unsigned short ot_credits;
180 unsigned short ot_id_cnt;
181 unsigned short ot_id_type;
182 uid_t ot_id_array[OSD_MAX_UGID_CNT];
184 #ifdef OSD_TRACK_DECLARES
185 unsigned char ot_declare_attr_set;
186 unsigned char ot_declare_punch;
187 unsigned char ot_declare_xattr_set;
188 unsigned char ot_declare_create;
189 unsigned char ot_declare_destroy;
190 unsigned char ot_declare_ref_add;
191 unsigned char ot_declare_ref_del;
192 unsigned char ot_declare_write;
193 unsigned char ot_declare_insert;
194 unsigned char ot_declare_delete;
197 #if OSD_THANDLE_STATS
198 /** time when this handle was allocated */
199 cfs_time_t oth_alloced;
201 /** time when this thanle was started */
202 cfs_time_t oth_started;
207 * Basic transaction credit op
217 DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
228 static int lu_device_is_osd(const struct lu_device *d)
230 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
233 static struct osd_device *osd_dt_dev(const struct dt_device *d)
235 LASSERT(lu_device_is_osd(&d->dd_lu_dev));
236 return container_of0(d, struct osd_device, od_dt_dev);
239 static struct osd_device *osd_dev(const struct lu_device *d)
241 LASSERT(lu_device_is_osd(d));
242 return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
245 static struct osd_device *osd_obj2dev(const struct osd_object *o)
247 return osd_dev(o->oo_dt.do_lu.lo_dev);
250 static struct super_block *osd_sb(const struct osd_device *dev)
252 return dev->od_mount->lmi_mnt->mnt_sb;
255 static int osd_object_is_root(const struct osd_object *obj)
257 return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
260 static struct osd_object *osd_obj(const struct lu_object *o)
262 LASSERT(lu_device_is_osd(o->lo_dev));
263 return container_of0(o, struct osd_object, oo_dt.do_lu);
266 static struct osd_object *osd_dt_obj(const struct dt_object *d)
268 return osd_obj(&d->do_lu);
271 static struct lu_device *osd2lu_dev(struct osd_device *osd)
273 return &osd->od_dt_dev.dd_lu_dev;
276 static journal_t *osd_journal(const struct osd_device *dev)
278 return LDISKFS_SB(osd_sb(dev))->s_journal;
281 static int osd_has_index(const struct osd_object *obj)
283 return obj->oo_dt.do_index_ops != NULL;
286 static int osd_object_invariant(const struct lu_object *l)
288 return osd_invariant(osd_obj(l));
291 #ifdef HAVE_QUOTA_SUPPORT
293 osd_push_ctxt(const struct lu_env *env, struct osd_ctxt *save)
295 struct md_ucred *uc = md_ucred(env);
300 save->oc_uid = current_fsuid();
301 save->oc_gid = current_fsgid();
302 save->oc_cap = current_cap();
303 if ((tc = prepare_creds())) {
304 tc->fsuid = uc->mu_fsuid;
305 tc->fsgid = uc->mu_fsgid;
308 /* XXX not suboptimal */
309 cfs_curproc_cap_unpack(uc->mu_cap);
313 osd_pop_ctxt(struct osd_ctxt *save)
317 if ((tc = prepare_creds())) {
318 tc->fsuid = save->oc_uid;
319 tc->fsgid = save->oc_gid;
320 tc->cap_effective = save->oc_cap;
326 static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
328 return lu_context_key_get(&env->le_ctx, &osd_key);
332 * Concurrency: doesn't matter
334 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
336 return osd_oti_get(env)->oti_r_locks > 0;
340 * Concurrency: doesn't matter
342 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
344 struct osd_thread_info *oti = osd_oti_get(env);
345 return oti->oti_w_locks > 0 && o->oo_owner == env;
349 * Concurrency: doesn't access mutable data
351 static int osd_root_get(const struct lu_env *env,
352 struct dt_device *dev, struct lu_fid *f)
356 inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
357 LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
361 static inline int osd_qid_type(struct osd_thandle *oh, int i)
363 return (oh->ot_id_type & (1 << i)) ? GRPQUOTA : USRQUOTA;
366 static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
368 oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
371 static void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
372 int type, uid_t id, struct inode *inode)
375 int i, allocated = 0;
376 struct osd_object *obj;
380 LASSERTF(oh->ot_id_cnt <= OSD_MAX_UGID_CNT, "count=%u",
383 /* id entry is allocated in the quota file */
384 if (inode && inode->i_dquot[type] && inode->i_dquot[type]->dq_off)
387 for (i = 0; i < oh->ot_id_cnt; i++) {
388 if (oh->ot_id_array[i] == id && osd_qid_type(oh, i) == type)
392 if (unlikely(i >= OSD_MAX_UGID_CNT)) {
393 CERROR("more than %d uid/gids for a transaction?\n", i);
397 oh->ot_id_array[i] = id;
398 osd_qid_set_type(oh, i, type);
400 obj = osd_dt_obj(dt);
401 oh->ot_credits += (allocated || id == 0) ?
402 1 : LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(osd_obj2dev(obj)));
407 * OSD object methods.
411 * Concurrency: no concurrent access is possible that early in object
414 static struct lu_object *osd_object_alloc(const struct lu_env *env,
415 const struct lu_object_header *hdr,
418 struct osd_object *mo;
424 l = &mo->oo_dt.do_lu;
425 dt_object_init(&mo->oo_dt, NULL, d);
426 if (osd_dev(d)->od_iop_mode)
427 mo->oo_dt.do_ops = &osd_obj_ea_ops;
429 mo->oo_dt.do_ops = &osd_obj_ops;
431 l->lo_ops = &osd_lu_obj_ops;
432 cfs_init_rwsem(&mo->oo_sem);
433 cfs_init_rwsem(&mo->oo_ext_idx_sem);
434 cfs_spin_lock_init(&mo->oo_guard);
442 * retrieve object from backend ext fs.
444 static struct inode *osd_iget(struct osd_thread_info *info,
445 struct osd_device *dev,
446 const struct osd_inode_id *id)
448 struct inode *inode = NULL;
450 #ifdef HAVE_EXT4_LDISKFS
451 inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
453 /* Newer kernels return an error instead of a NULL pointer */
456 inode = iget(osd_sb(dev), id->oii_ino);
459 CERROR("no inode\n");
460 inode = ERR_PTR(-EACCES);
461 } else if (id->oii_gen != OSD_OII_NOGEN &&
462 inode->i_generation != id->oii_gen) {
464 inode = ERR_PTR(-ESTALE);
465 } else if (inode->i_nlink == 0) {
466 /* due to parallel readdir and unlink,
467 * we can have dead inode here. */
468 CWARN("stale inode\n");
469 make_bad_inode(inode);
471 inode = ERR_PTR(-ESTALE);
472 } else if (is_bad_inode(inode)) {
473 CERROR("bad inode %lx\n",inode->i_ino);
475 inode = ERR_PTR(-ENOENT);
477 /* Do not update file c/mtime in ldiskfs.
478 * NB: we don't have any lock to protect this because we don't
479 * have reference on osd_object now, but contention with
480 * another lookup + attr_set can't happen in the tiny window
481 * between if (...) and set S_NOCMTIME. */
482 if (!(inode->i_flags & S_NOCMTIME))
483 inode->i_flags |= S_NOCMTIME;
488 static int osd_fid_lookup(const struct lu_env *env,
489 struct osd_object *obj, const struct lu_fid *fid)
491 struct osd_thread_info *info;
492 struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
493 struct osd_device *dev;
494 struct osd_inode_id *id;
498 LINVRNT(osd_invariant(obj));
499 LASSERT(obj->oo_inode == NULL);
500 LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid));
502 * This assertion checks that osd layer sees only local
503 * fids. Unfortunately it is somewhat expensive (does a
504 * cache-lookup). Disabling it for production/acceptance-testing.
506 LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
510 info = osd_oti_get(env);
514 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
517 result = osd_oi_lookup(info, osd_fid2oi(dev, fid), fid, id);
519 if (result == -ENOENT)
524 inode = osd_iget(info, dev, id);
527 * If fid wasn't found in oi, inode-less object is
528 * created, for which lu_object_exists() returns
529 * false. This is used in a (frequent) case when
530 * objects are created as locking anchors or
531 * place holders for objects yet to be created.
533 result = PTR_ERR(inode);
537 obj->oo_inode = inode;
538 LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
539 if (dev->od_iop_mode) {
540 obj->oo_compat_dot_created = 1;
541 obj->oo_compat_dotdot_created = 1;
544 if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
547 LASSERT(obj->oo_hl_head == NULL);
548 obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
549 if (obj->oo_hl_head == NULL) {
550 obj->oo_inode = NULL;
555 LINVRNT(osd_invariant(obj));
561 * Concurrency: shouldn't matter.
563 static void osd_object_init0(struct osd_object *obj)
565 LASSERT(obj->oo_inode != NULL);
566 obj->oo_dt.do_body_ops = &osd_body_ops;
567 obj->oo_dt.do_lu.lo_header->loh_attr |=
568 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
572 * Concurrency: no concurrent access is possible that early in object
575 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
576 const struct lu_object_conf *unused)
578 struct osd_object *obj = osd_obj(l);
581 LINVRNT(osd_invariant(obj));
583 result = osd_fid_lookup(env, obj, lu_object_fid(l));
584 obj->oo_dt.do_body_ops = &osd_body_ops_new;
586 if (obj->oo_inode != NULL)
587 osd_object_init0(obj);
589 LINVRNT(osd_invariant(obj));
594 * Concurrency: no concurrent access is possible that late in object
597 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
599 struct osd_object *obj = osd_obj(l);
601 LINVRNT(osd_invariant(obj));
603 dt_object_fini(&obj->oo_dt);
604 if (obj->oo_hl_head != NULL)
605 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
612 static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
613 const struct iam_container *bag)
615 return bag->ic_descr->id_ops->id_ipd_alloc(bag,
616 osd_oti_get(env)->oti_it_ipd);
619 static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
620 const struct iam_container *bag)
622 return bag->ic_descr->id_ops->id_ipd_alloc(bag,
623 osd_oti_get(env)->oti_idx_ipd);
626 static void osd_ipd_put(const struct lu_env *env,
627 const struct iam_container *bag,
628 struct iam_path_descr *ipd)
630 bag->ic_descr->id_ops->id_ipd_free(ipd);
634 * Concurrency: no concurrent access is possible that late in object
637 static void osd_index_fini(struct osd_object *o)
639 struct iam_container *bag;
641 if (o->oo_dir != NULL) {
642 bag = &o->oo_dir->od_container;
643 if (o->oo_inode != NULL) {
644 if (bag->ic_object == o->oo_inode)
645 iam_container_fini(bag);
647 OBD_FREE_PTR(o->oo_dir);
653 * Concurrency: no concurrent access is possible that late in object
654 * life-cycle (for all existing callers, that is. New callers have to provide
655 * their own locking.)
657 static int osd_inode_unlinked(const struct inode *inode)
659 return inode->i_nlink == 0;
663 OSD_TXN_OI_DELETE_CREDITS = 20,
664 OSD_TXN_INODE_DELETE_CREDITS = 20
671 #if OSD_THANDLE_STATS
673 * Set time when the handle is allocated
675 static void osd_th_alloced(struct osd_thandle *oth)
677 oth->oth_alloced = cfs_time_current();
681 * Set time when the handle started
683 static void osd_th_started(struct osd_thandle *oth)
685 oth->oth_started = cfs_time_current();
689 * Helper function to convert time interval to microseconds packed in
690 * long int (default time units for the counter in "stats" initialized
691 * by lu_time_init() )
693 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
697 cfs_duration_usec(cfs_time_sub(end, start), &val);
698 return val.tv_sec * 1000000 + val.tv_usec;
702 * Check whether the we deal with this handle for too long.
704 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
705 cfs_time_t alloced, cfs_time_t started,
708 cfs_time_t now = cfs_time_current();
710 LASSERT(dev != NULL);
712 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
713 interval_to_usec(alloced, started));
714 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
715 interval_to_usec(started, closed));
716 lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
717 interval_to_usec(closed, now));
719 if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
720 CWARN("transaction handle %p was open for too long: "
722 "alloced "CFS_TIME_T" ,"
723 "started "CFS_TIME_T" ,"
724 "closed "CFS_TIME_T"\n",
725 oth, now, alloced, started, closed);
726 libcfs_debug_dumpstack(NULL);
730 #define OSD_CHECK_SLOW_TH(oth, dev, expr) \
732 cfs_time_t __closed = cfs_time_current(); \
733 cfs_time_t __alloced = oth->oth_alloced; \
734 cfs_time_t __started = oth->oth_started; \
737 __osd_th_check_slow(oth, dev, __alloced, __started, __closed); \
740 #else /* OSD_THANDLE_STATS */
742 #define osd_th_alloced(h) do {} while(0)
743 #define osd_th_started(h) do {} while(0)
744 #define OSD_CHECK_SLOW_TH(oth, dev, expr) expr
746 #endif /* OSD_THANDLE_STATS */
749 * Concurrency: doesn't access mutable data.
751 static int osd_param_is_sane(const struct osd_device *dev,
752 const struct thandle *th)
754 struct osd_thandle *oh;
755 oh = container_of0(th, struct osd_thandle, ot_super);
756 return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
760 * Concurrency: shouldn't matter.
762 #ifdef HAVE_LDISKFS_JOURNAL_CALLBACK_ADD
763 static void osd_trans_commit_cb(struct super_block *sb,
764 struct journal_callback *jcb, int error)
766 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
769 struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
770 struct thandle *th = &oh->ot_super;
771 struct lu_device *lud = &th->th_dev->dd_lu_dev;
772 struct dt_txn_commit_cb *dcb, *tmp;
774 LASSERT(oh->ot_handle == NULL);
777 CERROR("transaction @0x%p commit error: %d\n", th, error);
779 dt_txn_hook_commit(th);
781 /* call per-transaction callbacks if any */
782 cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
783 dcb->dcb_func(NULL, th, dcb, error);
785 lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
789 lu_context_exit(&th->th_ctx);
790 lu_context_fini(&th->th_ctx);
794 static struct thandle *osd_trans_create(const struct lu_env *env,
797 struct osd_thread_info *oti = osd_oti_get(env);
798 struct osd_thandle *oh;
802 th = ERR_PTR(-ENOMEM);
803 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
808 th->th_tags = LCT_TX_HANDLE;
810 oti->oti_dev = osd_dt_dev(d);
811 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
818 * Concurrency: shouldn't matter.
820 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
823 struct osd_thread_info *oti = osd_oti_get(env);
824 struct osd_device *dev = osd_dt_dev(d);
826 struct osd_thandle *oh;
831 LASSERT(current->journal_info == NULL);
833 oh = container_of0(th, struct osd_thandle, ot_super);
835 LASSERT(oh->ot_handle == NULL);
837 rc = dt_txn_hook_start(env, d, th);
841 if (!osd_param_is_sane(dev, th)) {
842 CWARN("%s: too many transaction credits (%d > %d)\n",
843 d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
844 osd_journal(dev)->j_max_transaction_buffers);
845 /* XXX Limit the credits to 'max_transaction_buffers', and
846 * let the underlying filesystem to catch the error if
847 * we really need so many credits.
849 * This should be removed when we can calculate the
850 * credits precisely. */
851 oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
852 #ifdef OSD_TRACK_DECLARES
853 CERROR(" attr_set: %d, punch: %d, xattr_set: %d,\n",
854 oh->ot_declare_attr_set, oh->ot_declare_punch,
855 oh->ot_declare_xattr_set);
856 CERROR(" create: %d, ref_add: %d, ref_del: %d, write: %d\n",
857 oh->ot_declare_create, oh->ot_declare_ref_add,
858 oh->ot_declare_ref_del, oh->ot_declare_write);
859 CERROR(" insert: %d, delete: %d, destroy: %d\n",
860 oh->ot_declare_insert, oh->ot_declare_delete,
861 oh->ot_declare_destroy);
866 * XXX temporary stuff. Some abstraction layer should
869 jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
873 LASSERT(oti->oti_txns == 0);
874 lu_context_init(&th->th_ctx, th->th_tags);
875 lu_context_enter(&th->th_ctx);
877 lu_device_get(&d->dd_lu_dev);
878 oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
882 * XXX: current rule is that we first start tx,
883 * then lock object(s), but we can't use
884 * this rule for data (due to locking specifics
885 * in ldiskfs). also in long-term we'd like to
886 * use usually-used (locks;tx) ordering. so,
887 * UGLY thing is that we'll use one ordering for
888 * data (ofd) and reverse ordering for metadata
889 * (mdd). then at some point we'll fix the latter
891 if (lu_device_is_md(&d->dd_lu_dev)) {
892 LASSERT(oti->oti_r_locks == 0);
893 LASSERT(oti->oti_w_locks == 0);
906 * Concurrency: shouldn't matter.
908 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
911 struct osd_thandle *oh;
912 struct osd_thread_info *oti = osd_oti_get(env);
916 oh = container_of0(th, struct osd_thandle, ot_super);
918 if (oh->ot_handle != NULL) {
919 handle_t *hdl = oh->ot_handle;
921 hdl->h_sync = th->th_sync;
924 * add commit callback
925 * notice we don't do this in osd_trans_start()
926 * as underlying transaction can change during truncate
928 osd_journal_callback_set(hdl, osd_trans_commit_cb,
931 LASSERT(oti->oti_txns == 1);
934 * XXX: current rule is that we first start tx,
935 * then lock object(s), but we can't use
936 * this rule for data (due to locking specifics
937 * in ldiskfs). also in long-term we'd like to
938 * use usually-used (locks;tx) ordering. so,
939 * UGLY thing is that we'll use one ordering for
940 * data (ofd) and reverse ordering for metadata
941 * (mdd). then at some point we'll fix the latter
943 if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
944 LASSERT(oti->oti_r_locks == 0);
945 LASSERT(oti->oti_w_locks == 0);
947 rc = dt_txn_hook_stop(env, th);
949 CERROR("Failure in transaction hook: %d\n", rc);
950 oh->ot_handle = NULL;
951 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
952 rc = ldiskfs_journal_stop(hdl));
954 CERROR("Failure to stop transaction: %d\n", rc);
962 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
964 struct osd_thandle *oh = container_of0(th, struct osd_thandle,
967 cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
973 * Called just before object is freed. Releases all resources except for
974 * object itself (that is released by osd_object_free()).
976 * Concurrency: no concurrent access is possible that late in object
979 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
981 struct osd_object *obj = osd_obj(l);
982 struct inode *inode = obj->oo_inode;
984 LINVRNT(osd_invariant(obj));
987 * If object is unlinked remove fid->ino mapping from object index.
993 obj->oo_inode = NULL;
998 * Concurrency: ->loo_object_release() is called under site spin-lock.
1000 static void osd_object_release(const struct lu_env *env,
1001 struct lu_object *l)
1006 * Concurrency: shouldn't matter.
1008 static int osd_object_print(const struct lu_env *env, void *cookie,
1009 lu_printer_t p, const struct lu_object *l)
1011 struct osd_object *o = osd_obj(l);
1012 struct iam_descr *d;
1014 if (o->oo_dir != NULL)
1015 d = o->oo_dir->od_container.ic_descr;
1018 return (*p)(env, cookie, LUSTRE_OSD_NAME"-object@%p(i:%p:%lu/%u)[%s]",
1020 o->oo_inode ? o->oo_inode->i_ino : 0UL,
1021 o->oo_inode ? o->oo_inode->i_generation : 0,
1022 d ? d->id_ops->id_name : "plain");
1026 * Concurrency: shouldn't matter.
1028 int osd_statfs(const struct lu_env *env, struct dt_device *d,
1031 struct osd_device *osd = osd_dt_dev(d);
1032 struct super_block *sb = osd_sb(osd);
1035 cfs_spin_lock(&osd->od_osfs_lock);
1036 /* cache 1 second */
1037 if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
1038 result = ll_do_statfs(sb, &osd->od_kstatfs);
1039 if (likely(result == 0)) /* N.B. statfs can't really fail */
1040 osd->od_osfs_age = cfs_time_current_64();
1043 if (likely(result == 0))
1044 *sfs = osd->od_kstatfs;
1045 cfs_spin_unlock(&osd->od_osfs_lock);
1051 * Concurrency: doesn't access mutable data.
1053 static void osd_conf_get(const struct lu_env *env,
1054 const struct dt_device *dev,
1055 struct dt_device_param *param)
1057 struct super_block *sb = osd_sb(osd_dt_dev(dev));
1060 * XXX should be taken from not-yet-existing fs abstraction layer.
1062 param->ddp_max_name_len = LDISKFS_NAME_LEN;
1063 param->ddp_max_nlink = LDISKFS_LINK_MAX;
1064 param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
1065 param->ddp_mntopts = 0;
1066 if (test_opt(sb, XATTR_USER))
1067 param->ddp_mntopts |= MNTOPT_USERXATTR;
1068 if (test_opt(sb, POSIX_ACL))
1069 param->ddp_mntopts |= MNTOPT_ACL;
1071 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
1072 if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
1073 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE;
1076 param->ddp_max_ea_size = sb->s_blocksize;
1081 * Helper function to get and fill the buffer with input values.
1083 static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
1087 buf = &osd_oti_get(env)->oti_buf;
1094 * Concurrency: shouldn't matter.
1096 static int osd_sync(const struct lu_env *env, struct dt_device *d)
1098 CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
1099 return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
1103 * Start commit for OSD device.
1105 * An implementation of dt_commit_async method for OSD device.
1106 * Asychronously starts underlayng fs sync and thereby a transaction
1109 * \param env environment
1110 * \param d dt device
1112 * \see dt_device_operations
1114 static int osd_commit_async(const struct lu_env *env,
1115 struct dt_device *d)
1117 struct super_block *s = osd_sb(osd_dt_dev(d));
1120 CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
1121 RETURN(s->s_op->sync_fs(s, 0));
1125 * Concurrency: shouldn't matter.
1128 static void osd_ro(const struct lu_env *env, struct dt_device *d)
1130 struct super_block *sb = osd_sb(osd_dt_dev(d));
1133 CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
1135 __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
1140 * Concurrency: serialization provided by callers.
1142 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
1143 int mode, unsigned long timeout, __u32 alg,
1144 struct lustre_capa_key *keys)
1146 struct osd_device *dev = osd_dt_dev(d);
1149 dev->od_fl_capa = mode;
1150 dev->od_capa_timeout = timeout;
1151 dev->od_capa_alg = alg;
1152 dev->od_capa_keys = keys;
1157 * Concurrency: serialization provided by callers.
1159 static void osd_init_quota_ctxt(const struct lu_env *env, struct dt_device *d,
1160 struct dt_quota_ctxt *ctxt, void *data)
1162 struct obd_device *obd = (void *)ctxt;
1163 struct vfsmount *mnt = (struct vfsmount *)data;
1166 obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
1167 OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1168 obd->obd_lvfs_ctxt.pwdmnt = mnt;
1169 obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
1170 obd->obd_lvfs_ctxt.fs = get_ds();
1176 * Note: we do not count into QUOTA here.
1177 * If we mount with --data_journal we may need more.
1179 static const int osd_dto_credits_noquota[DTO_NR] = {
1182 * INDEX_EXTRA_TRANS_BLOCKS(8) +
1183 * SINGLEDATA_TRANS_BLOCKS(8)
1184 * XXX Note: maybe iam need more, since iam have more level than
1187 [DTO_INDEX_INSERT] = 16,
1188 [DTO_INDEX_DELETE] = 16,
1192 [DTO_INDEX_UPDATE] = 16,
1194 * Create a object. The same as create object in EXT3.
1195 * DATA_TRANS_BLOCKS(14) +
1196 * INDEX_EXTRA_BLOCKS(8) +
1197 * 3(inode bits, groups, GDT)
1199 [DTO_OBJECT_CREATE] = 25,
1201 * XXX: real credits to be fixed
1203 [DTO_OBJECT_DELETE] = 25,
1205 * Attr set credits (inode)
1207 [DTO_ATTR_SET_BASE] = 1,
1209 * Xattr set. The same as xattr of EXT3.
1210 * DATA_TRANS_BLOCKS(14)
1211 * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1212 * are also counted in. Do not know why?
1214 [DTO_XATTR_SET] = 14,
1217 * credits for inode change during write.
1219 [DTO_WRITE_BASE] = 3,
1221 * credits for single block write.
1223 [DTO_WRITE_BLOCK] = 14,
1225 * Attr set credits for chown.
1226 * This is extra credits for setattr, and it is null without quota
1228 [DTO_ATTR_SET_CHOWN]= 0
1231 static const struct dt_device_operations osd_dt_ops = {
1232 .dt_root_get = osd_root_get,
1233 .dt_statfs = osd_statfs,
1234 .dt_trans_create = osd_trans_create,
1235 .dt_trans_start = osd_trans_start,
1236 .dt_trans_stop = osd_trans_stop,
1237 .dt_trans_cb_add = osd_trans_cb_add,
1238 .dt_conf_get = osd_conf_get,
1239 .dt_sync = osd_sync,
1241 .dt_commit_async = osd_commit_async,
1242 .dt_init_capa_ctxt = osd_init_capa_ctxt,
1243 .dt_init_quota_ctxt= osd_init_quota_ctxt,
1246 static void osd_object_read_lock(const struct lu_env *env,
1247 struct dt_object *dt, unsigned role)
1249 struct osd_object *obj = osd_dt_obj(dt);
1250 struct osd_thread_info *oti = osd_oti_get(env);
1252 LINVRNT(osd_invariant(obj));
1254 LASSERT(obj->oo_owner != env);
1255 cfs_down_read_nested(&obj->oo_sem, role);
1257 LASSERT(obj->oo_owner == NULL);
1261 static void osd_object_write_lock(const struct lu_env *env,
1262 struct dt_object *dt, unsigned role)
1264 struct osd_object *obj = osd_dt_obj(dt);
1265 struct osd_thread_info *oti = osd_oti_get(env);
1267 LINVRNT(osd_invariant(obj));
1269 LASSERT(obj->oo_owner != env);
1270 cfs_down_write_nested(&obj->oo_sem, role);
1272 LASSERT(obj->oo_owner == NULL);
1273 obj->oo_owner = env;
1277 static void osd_object_read_unlock(const struct lu_env *env,
1278 struct dt_object *dt)
1280 struct osd_object *obj = osd_dt_obj(dt);
1281 struct osd_thread_info *oti = osd_oti_get(env);
1283 LINVRNT(osd_invariant(obj));
1285 LASSERT(oti->oti_r_locks > 0);
1287 cfs_up_read(&obj->oo_sem);
1290 static void osd_object_write_unlock(const struct lu_env *env,
1291 struct dt_object *dt)
1293 struct osd_object *obj = osd_dt_obj(dt);
1294 struct osd_thread_info *oti = osd_oti_get(env);
1296 LINVRNT(osd_invariant(obj));
1298 LASSERT(obj->oo_owner == env);
1299 LASSERT(oti->oti_w_locks > 0);
1301 obj->oo_owner = NULL;
1302 cfs_up_write(&obj->oo_sem);
1305 static int osd_object_write_locked(const struct lu_env *env,
1306 struct dt_object *dt)
1308 struct osd_object *obj = osd_dt_obj(dt);
1310 LINVRNT(osd_invariant(obj));
1312 return obj->oo_owner == env;
1315 static int capa_is_sane(const struct lu_env *env,
1316 struct osd_device *dev,
1317 struct lustre_capa *capa,
1318 struct lustre_capa_key *keys)
1320 struct osd_thread_info *oti = osd_oti_get(env);
1321 struct lustre_capa *tcapa = &oti->oti_capa;
1322 struct obd_capa *oc;
1326 oc = capa_lookup(dev->od_capa_hash, capa, 0);
1328 if (capa_is_expired(oc)) {
1329 DEBUG_CAPA(D_ERROR, capa, "expired");
1336 if (capa_is_expired_sec(capa)) {
1337 DEBUG_CAPA(D_ERROR, capa, "expired");
1341 cfs_spin_lock(&capa_lock);
1342 for (i = 0; i < 2; i++) {
1343 if (keys[i].lk_keyid == capa->lc_keyid) {
1344 oti->oti_capa_key = keys[i];
1348 cfs_spin_unlock(&capa_lock);
1351 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1355 rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1359 if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1360 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1364 oc = capa_add(dev->od_capa_hash, capa);
1370 static int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1371 struct lustre_capa *capa, __u64 opc)
1373 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1374 struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1375 struct md_capainfo *ci;
1378 if (!dev->od_fl_capa)
1381 if (capa == BYPASS_CAPA)
1384 ci = md_capainfo(env);
1388 if (ci->mc_auth == LC_ID_NONE)
1392 CERROR("no capability is provided for fid "DFID"\n", PFID(fid));
1396 if (!lu_fid_eq(fid, &capa->lc_fid)) {
1397 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1402 if (!capa_opc_supported(capa, opc)) {
1403 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1407 if ((rc = capa_is_sane(env, dev, capa, dev->od_capa_keys))) {
1408 DEBUG_CAPA(D_ERROR, capa, "insane (rc %d)", rc);
1415 static struct timespec *osd_inode_time(const struct lu_env *env,
1416 struct inode *inode, __u64 seconds)
1418 struct osd_thread_info *oti = osd_oti_get(env);
1419 struct timespec *t = &oti->oti_time;
1421 t->tv_sec = seconds;
1423 *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
1428 static void osd_inode_getattr(const struct lu_env *env,
1429 struct inode *inode, struct lu_attr *attr)
1431 attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1432 LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1433 LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
1435 attr->la_atime = LTIME_S(inode->i_atime);
1436 attr->la_mtime = LTIME_S(inode->i_mtime);
1437 attr->la_ctime = LTIME_S(inode->i_ctime);
1438 attr->la_mode = inode->i_mode;
1439 attr->la_size = i_size_read(inode);
1440 attr->la_blocks = inode->i_blocks;
1441 attr->la_uid = inode->i_uid;
1442 attr->la_gid = inode->i_gid;
1443 attr->la_flags = LDISKFS_I(inode)->i_flags;
1444 attr->la_nlink = inode->i_nlink;
1445 attr->la_rdev = inode->i_rdev;
1446 attr->la_blksize = ll_inode_blksize(inode);
1447 attr->la_blkbits = inode->i_blkbits;
1450 static int osd_attr_get(const struct lu_env *env,
1451 struct dt_object *dt,
1452 struct lu_attr *attr,
1453 struct lustre_capa *capa)
1455 struct osd_object *obj = osd_dt_obj(dt);
1457 LASSERT(dt_object_exists(dt));
1458 LINVRNT(osd_invariant(obj));
1460 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1463 cfs_spin_lock(&obj->oo_guard);
1464 osd_inode_getattr(env, obj->oo_inode, attr);
1465 cfs_spin_unlock(&obj->oo_guard);
1469 static int osd_declare_attr_set(const struct lu_env *env,
1470 struct dt_object *dt,
1471 const struct lu_attr *attr,
1472 struct thandle *handle)
1474 struct osd_thandle *oh;
1475 struct osd_object *obj;
1477 LASSERT(dt != NULL);
1478 LASSERT(handle != NULL);
1480 obj = osd_dt_obj(dt);
1481 LASSERT(osd_invariant(obj));
1483 oh = container_of0(handle, struct osd_thandle, ot_super);
1484 LASSERT(oh->ot_handle == NULL);
1486 OSD_DECLARE_OP(oh, attr_set);
1487 oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
1489 if (attr && attr->la_valid & LA_UID) {
1491 osd_declare_qid(dt, oh, USRQUOTA, obj->oo_inode->i_uid,
1493 osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
1495 if (attr && attr->la_valid & LA_GID) {
1497 osd_declare_qid(dt, oh, GRPQUOTA, obj->oo_inode->i_gid,
1499 osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
1505 static int osd_inode_setattr(const struct lu_env *env,
1506 struct inode *inode, const struct lu_attr *attr)
1510 bits = attr->la_valid;
1512 LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
1514 #ifdef HAVE_QUOTA_SUPPORT
1515 if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
1516 (bits & LA_GID && attr->la_gid != inode->i_gid)) {
1517 struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
1523 iattr.ia_valid |= ATTR_UID;
1525 iattr.ia_valid |= ATTR_GID;
1526 iattr.ia_uid = attr->la_uid;
1527 iattr.ia_gid = attr->la_gid;
1528 osd_push_ctxt(env, save);
1529 rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
1536 if (bits & LA_ATIME)
1537 inode->i_atime = *osd_inode_time(env, inode, attr->la_atime);
1538 if (bits & LA_CTIME)
1539 inode->i_ctime = *osd_inode_time(env, inode, attr->la_ctime);
1540 if (bits & LA_MTIME)
1541 inode->i_mtime = *osd_inode_time(env, inode, attr->la_mtime);
1542 if (bits & LA_SIZE) {
1543 LDISKFS_I(inode)->i_disksize = attr->la_size;
1544 i_size_write(inode, attr->la_size);
1548 /* OSD should not change "i_blocks" which is used by quota.
1549 * "i_blocks" should be changed by ldiskfs only. */
1550 if (bits & LA_BLOCKS)
1551 inode->i_blocks = attr->la_blocks;
1554 inode->i_mode = (inode->i_mode & S_IFMT) |
1555 (attr->la_mode & ~S_IFMT);
1557 inode->i_uid = attr->la_uid;
1559 inode->i_gid = attr->la_gid;
1560 if (bits & LA_NLINK)
1561 inode->i_nlink = attr->la_nlink;
1563 inode->i_rdev = attr->la_rdev;
1565 if (bits & LA_FLAGS) {
1566 /* always keep S_NOCMTIME */
1567 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1573 static int osd_attr_set(const struct lu_env *env,
1574 struct dt_object *dt,
1575 const struct lu_attr *attr,
1576 struct thandle *handle,
1577 struct lustre_capa *capa)
1579 struct osd_object *obj = osd_dt_obj(dt);
1582 LASSERT(handle != NULL);
1583 LASSERT(dt_object_exists(dt));
1584 LASSERT(osd_invariant(obj));
1586 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1589 OSD_EXEC_OP(handle, attr_set);
1591 cfs_spin_lock(&obj->oo_guard);
1592 rc = osd_inode_setattr(env, obj->oo_inode, attr);
1593 cfs_spin_unlock(&obj->oo_guard);
1596 obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
1603 * XXX temporary solution.
1605 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
1606 struct lu_attr *attr, struct thandle *th)
1611 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
1612 struct lu_attr *attr, struct thandle *th)
1614 osd_object_init0(obj);
1615 if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
1616 unlock_new_inode(obj->oo_inode);
1620 static struct dentry * osd_child_dentry_get(const struct lu_env *env,
1621 struct osd_object *obj,
1625 struct osd_thread_info *info = osd_oti_get(env);
1626 struct dentry *child_dentry = &info->oti_child_dentry;
1627 struct dentry *obj_dentry = &info->oti_obj_dentry;
1629 obj_dentry->d_inode = obj->oo_inode;
1630 obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
1631 obj_dentry->d_name.hash = 0;
1633 child_dentry->d_name.hash = 0;
1634 child_dentry->d_parent = obj_dentry;
1635 child_dentry->d_name.name = name;
1636 child_dentry->d_name.len = namelen;
1637 return child_dentry;
1641 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1643 struct dt_allocation_hint *hint,
1647 struct osd_device *osd = osd_obj2dev(obj);
1648 struct osd_thandle *oth;
1649 struct dt_object *parent;
1650 struct inode *inode;
1651 #ifdef HAVE_QUOTA_SUPPORT
1652 struct osd_ctxt *save = &info->oti_ctxt;
1655 LINVRNT(osd_invariant(obj));
1656 LASSERT(obj->oo_inode == NULL);
1657 LASSERT(obj->oo_hl_head == NULL);
1659 if (S_ISDIR(mode) && ldiskfs_pdo) {
1660 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1661 if (obj->oo_hl_head == NULL)
1665 oth = container_of(th, struct osd_thandle, ot_super);
1666 LASSERT(oth->ot_handle->h_transaction != NULL);
1668 if (hint && hint->dah_parent)
1669 parent = hint->dah_parent;
1671 parent = osd->od_obj_area;
1673 LASSERT(parent != NULL);
1674 LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
1676 #ifdef HAVE_QUOTA_SUPPORT
1677 osd_push_ctxt(info->oti_env, save);
1679 inode = ldiskfs_create_inode(oth->ot_handle,
1680 osd_dt_obj(parent)->oo_inode, mode);
1681 #ifdef HAVE_QUOTA_SUPPORT
1684 if (!IS_ERR(inode)) {
1685 /* Do not update file c/mtime in ldiskfs.
1686 * NB: don't need any lock because no contention at this
1688 inode->i_flags |= S_NOCMTIME;
1689 obj->oo_inode = inode;
1692 if (obj->oo_hl_head != NULL) {
1693 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1694 obj->oo_hl_head = NULL;
1696 result = PTR_ERR(inode);
1698 LINVRNT(osd_invariant(obj));
1706 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1707 struct lu_attr *attr,
1708 struct dt_allocation_hint *hint,
1709 struct dt_object_format *dof,
1713 struct osd_thandle *oth;
1714 struct osd_device *osd = osd_obj2dev(obj);
1715 __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1717 LASSERT(S_ISDIR(attr->la_mode));
1719 oth = container_of(th, struct osd_thandle, ot_super);
1720 LASSERT(oth->ot_handle->h_transaction != NULL);
1721 result = osd_mkfile(info, obj, mode, hint, th);
1722 if (result == 0 && osd->od_iop_mode == 0) {
1723 LASSERT(obj->oo_inode != NULL);
1725 * XXX uh-oh... call low-level iam function directly.
1728 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
1729 sizeof (struct osd_fid_pack),
1735 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
1736 struct lu_attr *attr,
1737 struct dt_allocation_hint *hint,
1738 struct dt_object_format *dof,
1742 struct osd_thandle *oth;
1743 const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
1745 __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1747 LASSERT(S_ISREG(attr->la_mode));
1749 oth = container_of(th, struct osd_thandle, ot_super);
1750 LASSERT(oth->ot_handle->h_transaction != NULL);
1752 result = osd_mkfile(info, obj, mode, hint, th);
1754 LASSERT(obj->oo_inode != NULL);
1755 if (feat->dif_flags & DT_IND_VARKEY)
1756 result = iam_lvar_create(obj->oo_inode,
1757 feat->dif_keysize_max,
1759 feat->dif_recsize_max,
1762 result = iam_lfix_create(obj->oo_inode,
1763 feat->dif_keysize_max,
1765 feat->dif_recsize_max,
1772 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
1773 struct lu_attr *attr,
1774 struct dt_allocation_hint *hint,
1775 struct dt_object_format *dof,
1778 LASSERT(S_ISREG(attr->la_mode));
1779 return osd_mkfile(info, obj, (attr->la_mode &
1780 (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1783 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
1784 struct lu_attr *attr,
1785 struct dt_allocation_hint *hint,
1786 struct dt_object_format *dof,
1789 LASSERT(S_ISLNK(attr->la_mode));
1790 return osd_mkfile(info, obj, (attr->la_mode &
1791 (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
1794 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
1795 struct lu_attr *attr,
1796 struct dt_allocation_hint *hint,
1797 struct dt_object_format *dof,
1800 cfs_umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
1803 LINVRNT(osd_invariant(obj));
1804 LASSERT(obj->oo_inode == NULL);
1805 LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
1806 S_ISFIFO(mode) || S_ISSOCK(mode));
1808 result = osd_mkfile(info, obj, mode, hint, th);
1810 LASSERT(obj->oo_inode != NULL);
1811 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
1813 LINVRNT(osd_invariant(obj));
1817 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
1819 struct dt_allocation_hint *hint,
1820 struct dt_object_format *dof,
1823 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
1825 osd_obj_type_f result;
1841 result = osd_mk_index;
1852 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
1853 struct dt_object *parent, cfs_umode_t child_mode)
1857 memset(ah, 0, sizeof(*ah));
1858 ah->dah_parent = parent;
1859 ah->dah_mode = child_mode;
1863 * Helper function for osd_object_create()
1865 * \retval 0, on success
1867 static int __osd_object_create(struct osd_thread_info *info,
1868 struct osd_object *obj, struct lu_attr *attr,
1869 struct dt_allocation_hint *hint,
1870 struct dt_object_format *dof,
1876 result = osd_create_pre(info, obj, attr, th);
1878 result = osd_create_type_f(dof->dof_type)(info, obj,
1879 attr, hint, dof, th);
1881 result = osd_create_post(info, obj, attr, th);
1887 * Helper function for osd_object_create()
1889 * \retval 0, on success
1891 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
1892 const struct lu_fid *fid, struct thandle *th)
1894 struct osd_thread_info *info = osd_oti_get(env);
1895 struct osd_inode_id *id = &info->oti_id;
1896 struct osd_device *osd = osd_obj2dev(obj);
1897 struct md_ucred *uc = md_ucred(env);
1899 LASSERT(obj->oo_inode != NULL);
1900 LASSERT(uc != NULL);
1902 id->oii_ino = obj->oo_inode->i_ino;
1903 id->oii_gen = obj->oo_inode->i_generation;
1905 return osd_oi_insert(info, osd_fid2oi(osd, fid), fid, id, th,
1906 uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
1909 static int osd_declare_object_create(const struct lu_env *env,
1910 struct dt_object *dt,
1911 struct lu_attr *attr,
1912 struct dt_allocation_hint *hint,
1913 struct dt_object_format *dof,
1914 struct thandle *handle)
1916 struct osd_thandle *oh;
1918 LASSERT(handle != NULL);
1920 oh = container_of0(handle, struct osd_thandle, ot_super);
1921 LASSERT(oh->ot_handle == NULL);
1923 OSD_DECLARE_OP(oh, create);
1924 oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
1925 /* XXX: So far, only normal fid needs be inserted into the oi,
1926 * things could be changed later. Revise following code then. */
1927 if (fid_is_norm(lu_object_fid(&dt->do_lu))) {
1928 OSD_DECLARE_OP(oh, insert);
1929 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
1931 /* If this is directory, then we expect . and .. to be inserted as
1932 * well. The one directory block always needs to be created for the
1933 * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
1934 * block), there is no danger of needing a tree for the first block.
1936 if (attr && S_ISDIR(attr->la_mode)) {
1937 OSD_DECLARE_OP(oh, insert);
1938 OSD_DECLARE_OP(oh, insert);
1939 oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BASE];
1943 osd_declare_qid(dt, oh, USRQUOTA, attr->la_uid, NULL);
1944 osd_declare_qid(dt, oh, GRPQUOTA, attr->la_gid, NULL);
1949 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
1950 struct lu_attr *attr,
1951 struct dt_allocation_hint *hint,
1952 struct dt_object_format *dof,
1955 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1956 struct osd_object *obj = osd_dt_obj(dt);
1957 struct osd_thread_info *info = osd_oti_get(env);
1962 LINVRNT(osd_invariant(obj));
1963 LASSERT(!dt_object_exists(dt));
1964 LASSERT(osd_write_locked(env, obj));
1965 LASSERT(th != NULL);
1967 OSD_EXEC_OP(th, create);
1969 result = __osd_object_create(info, obj, attr, hint, dof, th);
1971 result = __osd_oi_insert(env, obj, fid, th);
1973 LASSERT(ergo(result == 0, dt_object_exists(dt)));
1974 LASSERT(osd_invariant(obj));
1979 * Called to destroy on-disk representation of the object
1981 * Concurrency: must be locked
1983 static int osd_declare_object_destroy(const struct lu_env *env,
1984 struct dt_object *dt,
1987 struct osd_object *obj = osd_dt_obj(dt);
1988 struct inode *inode = obj->oo_inode;
1989 struct osd_thandle *oh;
1992 oh = container_of0(th, struct osd_thandle, ot_super);
1993 LASSERT(oh->ot_handle == NULL);
1996 OSD_DECLARE_OP(oh, destroy);
1997 OSD_DECLARE_OP(oh, delete);
1998 oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
1999 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
2001 osd_declare_qid(dt, oh, USRQUOTA, inode->i_uid, inode);
2002 osd_declare_qid(dt, oh, GRPQUOTA, inode->i_gid, inode);
2007 static int osd_object_destroy(const struct lu_env *env,
2008 struct dt_object *dt,
2011 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2012 struct osd_object *obj = osd_dt_obj(dt);
2013 struct inode *inode = obj->oo_inode;
2014 struct osd_device *osd = osd_obj2dev(obj);
2015 struct osd_thandle *oh;
2019 oh = container_of0(th, struct osd_thandle, ot_super);
2020 LASSERT(oh->ot_handle);
2022 LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
2024 if (S_ISDIR(inode->i_mode)) {
2025 LASSERT(osd_inode_unlinked(inode) ||
2026 inode->i_nlink == 1);
2027 cfs_spin_lock(&obj->oo_guard);
2029 cfs_spin_unlock(&obj->oo_guard);
2030 inode->i_sb->s_op->dirty_inode(inode);
2032 LASSERT(osd_inode_unlinked(inode));
2035 OSD_EXEC_OP(th, destroy);
2037 result = osd_oi_delete(osd_oti_get(env),
2038 osd_fid2oi(osd, fid), fid, th);
2040 /* XXX: add to ext3 orphan list */
2041 /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
2043 /* not needed in the cache anymore */
2044 set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
2050 * Helper function for osd_xattr_set()
2052 static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2053 const struct lu_buf *buf, const char *name, int fl)
2055 struct osd_object *obj = osd_dt_obj(dt);
2056 struct inode *inode = obj->oo_inode;
2057 struct osd_thread_info *info = osd_oti_get(env);
2058 struct dentry *dentry = &info->oti_child_dentry;
2062 LASSERT(dt_object_exists(dt));
2063 LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
2064 LASSERT(osd_write_locked(env, obj));
2066 if (fl & LU_XATTR_REPLACE)
2067 fs_flags |= XATTR_REPLACE;
2069 if (fl & LU_XATTR_CREATE)
2070 fs_flags |= XATTR_CREATE;
2072 dentry->d_inode = inode;
2073 rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
2074 buf->lb_len, fs_flags);
2079 * Put the fid into lustre_mdt_attrs, and then place the structure
2080 * inode's ea. This fid should not be altered during the life time
2083 * \retval +ve, on success
2084 * \retval -ve, on error
2086 * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
2088 static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
2089 const struct lu_fid *fid)
2091 struct osd_thread_info *info = osd_oti_get(env);
2092 struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
2094 lustre_lma_init(mdt_attrs, fid);
2095 lustre_lma_swab(mdt_attrs);
2096 return __osd_xattr_set(env, dt,
2097 osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
2098 XATTR_NAME_LMA, LU_XATTR_CREATE);
2103 * Helper function to form igif
2105 static inline void osd_igif_get(const struct lu_env *env, struct inode *inode,
2108 LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
2112 * Helper function to pack the fid, ldiskfs stores fid in packed format.
2114 void osd_fid_pack(struct osd_fid_pack *pack, const struct dt_rec *fid,
2115 struct lu_fid *befider)
2117 fid_cpu_to_be(befider, (struct lu_fid *)fid);
2118 memcpy(pack->fp_area, befider, sizeof(*befider));
2119 pack->fp_len = sizeof(*befider) + 1;
2123 * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
2124 * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
2125 * To have compatilibility with 1.8 ldiskfs driver we need to have
2126 * magic number at start of fid data.
2127 * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
2130 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
2131 const struct dt_rec *fid)
2133 param->edp_magic = LDISKFS_LUFID_MAGIC;
2134 param->edp_len = sizeof(struct lu_fid) + 1;
2136 fid_cpu_to_be((struct lu_fid *)param->edp_data,
2137 (struct lu_fid *)fid);
2140 int osd_fid_unpack(struct lu_fid *fid, const struct osd_fid_pack *pack)
2145 switch (pack->fp_len) {
2146 case sizeof *fid + 1:
2147 memcpy(fid, pack->fp_area, sizeof *fid);
2148 fid_be_to_cpu(fid, fid);
2151 CERROR("Unexpected packed fid size: %d\n", pack->fp_len);
2158 * Try to read the fid from inode ea into dt_rec, if return value
2159 * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
2161 * \param fid object fid.
2163 * \retval 0 on success
2165 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
2166 __u32 ino, struct lu_fid *fid)
2168 struct osd_thread_info *info = osd_oti_get(env);
2169 struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
2170 struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
2171 struct dentry *dentry = &info->oti_child_dentry;
2172 struct osd_inode_id *id = &info->oti_id;
2173 struct osd_device *dev;
2174 struct inode *inode;
2178 dev = osd_dev(ldev);
2181 id->oii_gen = OSD_OII_NOGEN;
2183 inode = osd_iget(info, dev, id);
2184 if (IS_ERR(inode)) {
2185 rc = PTR_ERR(inode);
2188 dentry->d_inode = inode;
2190 LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2191 rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
2194 /* Check LMA compatibility */
2196 (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
2197 CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
2198 inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
2199 ~LMA_INCOMPAT_SUPP);
2204 lustre_lma_swab(mdt_attrs);
2205 memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
2207 } else if (rc == -ENODATA) {
2208 osd_igif_get(env, inode, fid);
2217 * OSD layer object create function for interoperability mode (b11826).
2218 * This is mostly similar to osd_object_create(). Only difference being, fid is
2219 * inserted into inode ea here.
2221 * \retval 0, on success
2222 * \retval -ve, on error
2224 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
2225 struct lu_attr *attr,
2226 struct dt_allocation_hint *hint,
2227 struct dt_object_format *dof,
2230 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2231 struct osd_object *obj = osd_dt_obj(dt);
2232 struct osd_thread_info *info = osd_oti_get(env);
2237 LASSERT(osd_invariant(obj));
2238 LASSERT(!dt_object_exists(dt));
2239 LASSERT(osd_write_locked(env, obj));
2240 LASSERT(th != NULL);
2242 OSD_EXEC_OP(th, create);
2244 result = __osd_object_create(info, obj, attr, hint, dof, th);
2246 /* objects under osd root shld have igif fid, so dont add fid EA */
2247 if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
2248 result = osd_ea_fid_set(env, dt, fid);
2251 result = __osd_oi_insert(env, obj, fid, th);
2253 LASSERT(ergo(result == 0, dt_object_exists(dt)));
2254 LINVRNT(osd_invariant(obj));
2258 static int osd_declare_object_ref_add(const struct lu_env *env,
2259 struct dt_object *dt,
2260 struct thandle *handle)
2262 struct osd_thandle *oh;
2264 /* it's possible that object doesn't exist yet */
2265 LASSERT(handle != NULL);
2267 oh = container_of0(handle, struct osd_thandle, ot_super);
2268 LASSERT(oh->ot_handle == NULL);
2270 OSD_DECLARE_OP(oh, ref_add);
2271 oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
2277 * Concurrency: @dt is write locked.
2279 static int osd_object_ref_add(const struct lu_env *env,
2280 struct dt_object *dt,
2283 struct osd_object *obj = osd_dt_obj(dt);
2284 struct inode *inode = obj->oo_inode;
2286 LINVRNT(osd_invariant(obj));
2287 LASSERT(dt_object_exists(dt));
2288 LASSERT(osd_write_locked(env, obj));
2289 LASSERT(th != NULL);
2291 OSD_EXEC_OP(th, ref_add);
2294 * DIR_NLINK feature is set for compatibility reasons if:
2295 * 1) nlinks > LDISKFS_LINK_MAX, or
2296 * 2) nlinks == 2, since this indicates i_nlink was previously 1.
2298 * It is easier to always set this flag (rather than check and set),
2299 * since it has less overhead, and the superblock will be dirtied
2300 * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
2301 * do not actually care whether this flag is set or not.
2303 cfs_spin_lock(&obj->oo_guard);
2305 if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
2306 if (inode->i_nlink >= LDISKFS_LINK_MAX ||
2307 inode->i_nlink == 2)
2310 LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
2311 cfs_spin_unlock(&obj->oo_guard);
2312 inode->i_sb->s_op->dirty_inode(inode);
2313 LINVRNT(osd_invariant(obj));
2318 static int osd_declare_object_ref_del(const struct lu_env *env,
2319 struct dt_object *dt,
2320 struct thandle *handle)
2322 struct osd_thandle *oh;
2324 LASSERT(dt_object_exists(dt));
2325 LASSERT(handle != NULL);
2327 oh = container_of0(handle, struct osd_thandle, ot_super);
2328 LASSERT(oh->ot_handle == NULL);
2330 OSD_DECLARE_OP(oh, ref_del);
2331 oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
2337 * Concurrency: @dt is write locked.
2339 static int osd_object_ref_del(const struct lu_env *env,
2340 struct dt_object *dt,
2343 struct osd_object *obj = osd_dt_obj(dt);
2344 struct inode *inode = obj->oo_inode;
2346 LINVRNT(osd_invariant(obj));
2347 LASSERT(dt_object_exists(dt));
2348 LASSERT(osd_write_locked(env, obj));
2349 LASSERT(th != NULL);
2351 OSD_EXEC_OP(th, ref_del);
2353 cfs_spin_lock(&obj->oo_guard);
2354 LASSERT(inode->i_nlink > 0);
2356 /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
2357 * then the nlink count is 1. Don't let it be set to 0 or the directory
2358 * inode will be deleted incorrectly. */
2359 if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
2361 cfs_spin_unlock(&obj->oo_guard);
2362 inode->i_sb->s_op->dirty_inode(inode);
2363 LINVRNT(osd_invariant(obj));
2369 * Get the 64-bit version for an inode.
2371 static int osd_object_version_get(const struct lu_env *env,
2372 struct dt_object *dt, dt_obj_version_t *ver)
2374 struct inode *inode = osd_dt_obj(dt)->oo_inode;
2376 CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2377 LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2378 *ver = LDISKFS_I(inode)->i_fs_version;
2383 * Concurrency: @dt is read locked.
2385 static int osd_xattr_get(const struct lu_env *env,
2386 struct dt_object *dt,
2389 struct lustre_capa *capa)
2391 struct osd_object *obj = osd_dt_obj(dt);
2392 struct inode *inode = obj->oo_inode;
2393 struct osd_thread_info *info = osd_oti_get(env);
2394 struct dentry *dentry = &info->oti_obj_dentry;
2396 /* version get is not real XATTR but uses xattr API */
2397 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2398 /* for version we are just using xattr API but change inode
2400 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2401 osd_object_version_get(env, dt, buf->lb_buf);
2402 return sizeof(dt_obj_version_t);
2405 LASSERT(dt_object_exists(dt));
2406 LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2407 LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2409 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2412 dentry->d_inode = inode;
2413 return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
2417 static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
2418 const struct lu_buf *buf, const char *name,
2419 int fl, struct thandle *handle)
2421 struct osd_thandle *oh;
2423 LASSERT(handle != NULL);
2425 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2426 /* no credits for version */
2430 oh = container_of0(handle, struct osd_thandle, ot_super);
2431 LASSERT(oh->ot_handle == NULL);
2433 OSD_DECLARE_OP(oh, xattr_set);
2434 oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
2440 * Set the 64-bit version for object
2442 static void osd_object_version_set(const struct lu_env *env,
2443 struct dt_object *dt,
2444 dt_obj_version_t *new_version)
2446 struct inode *inode = osd_dt_obj(dt)->oo_inode;
2448 CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2449 *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2451 LDISKFS_I(inode)->i_fs_version = *new_version;
2452 /** Version is set after all inode operations are finished,
2453 * so we should mark it dirty here */
2454 inode->i_sb->s_op->dirty_inode(inode);
2458 * Concurrency: @dt is write locked.
2460 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2461 const struct lu_buf *buf, const char *name, int fl,
2462 struct thandle *handle, struct lustre_capa *capa)
2464 LASSERT(handle != NULL);
2466 /* version set is not real XATTR */
2467 if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2468 /* for version we are just using xattr API but change inode
2470 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2471 osd_object_version_set(env, dt, buf->lb_buf);
2472 return sizeof(dt_obj_version_t);
2475 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2478 OSD_EXEC_OP(handle, xattr_set);
2479 return __osd_xattr_set(env, dt, buf, name, fl);
2483 * Concurrency: @dt is read locked.
2485 static int osd_xattr_list(const struct lu_env *env,
2486 struct dt_object *dt,
2488 struct lustre_capa *capa)
2490 struct osd_object *obj = osd_dt_obj(dt);
2491 struct inode *inode = obj->oo_inode;
2492 struct osd_thread_info *info = osd_oti_get(env);
2493 struct dentry *dentry = &info->oti_obj_dentry;
2495 LASSERT(dt_object_exists(dt));
2496 LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2497 LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2499 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2502 dentry->d_inode = inode;
2503 return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2506 static int osd_declare_xattr_del(const struct lu_env *env,
2507 struct dt_object *dt,
2509 struct thandle *handle)
2511 struct osd_thandle *oh;
2513 LASSERT(dt_object_exists(dt));
2514 LASSERT(handle != NULL);
2516 oh = container_of0(handle, struct osd_thandle, ot_super);
2517 LASSERT(oh->ot_handle == NULL);
2519 OSD_DECLARE_OP(oh, xattr_set);
2520 oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
2526 * Concurrency: @dt is write locked.
2528 static int osd_xattr_del(const struct lu_env *env,
2529 struct dt_object *dt,
2531 struct thandle *handle,
2532 struct lustre_capa *capa)
2534 struct osd_object *obj = osd_dt_obj(dt);
2535 struct inode *inode = obj->oo_inode;
2536 struct osd_thread_info *info = osd_oti_get(env);
2537 struct dentry *dentry = &info->oti_obj_dentry;
2540 LASSERT(dt_object_exists(dt));
2541 LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
2542 LASSERT(osd_write_locked(env, obj));
2543 LASSERT(handle != NULL);
2545 if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2548 OSD_EXEC_OP(handle, xattr_set);
2550 dentry->d_inode = inode;
2551 rc = inode->i_op->removexattr(dentry, name);
2555 static struct obd_capa *osd_capa_get(const struct lu_env *env,
2556 struct dt_object *dt,
2557 struct lustre_capa *old,
2560 struct osd_thread_info *info = osd_oti_get(env);
2561 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
2562 struct osd_object *obj = osd_dt_obj(dt);
2563 struct osd_device *dev = osd_obj2dev(obj);
2564 struct lustre_capa_key *key = &info->oti_capa_key;
2565 struct lustre_capa *capa = &info->oti_capa;
2566 struct obd_capa *oc;
2567 struct md_capainfo *ci;
2571 if (!dev->od_fl_capa)
2572 RETURN(ERR_PTR(-ENOENT));
2574 LASSERT(dt_object_exists(dt));
2575 LINVRNT(osd_invariant(obj));
2577 /* renewal sanity check */
2578 if (old && osd_object_auth(env, dt, old, opc))
2579 RETURN(ERR_PTR(-EACCES));
2581 ci = md_capainfo(env);
2583 RETURN(ERR_PTR(-ENOENT));
2585 switch (ci->mc_auth) {
2589 capa->lc_uid = obj->oo_inode->i_uid;
2590 capa->lc_gid = obj->oo_inode->i_gid;
2591 capa->lc_flags = LC_ID_PLAIN;
2593 case LC_ID_CONVERT: {
2596 s[0] = obj->oo_inode->i_uid;
2597 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
2598 s[2] = obj->oo_inode->i_gid;
2599 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
2600 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
2602 RETURN(ERR_PTR(rc));
2604 capa->lc_uid = ((__u64)d[1] << 32) | d[0];
2605 capa->lc_gid = ((__u64)d[3] << 32) | d[2];
2606 capa->lc_flags = LC_ID_CONVERT;
2610 RETURN(ERR_PTR(-EINVAL));
2613 capa->lc_fid = *fid;
2615 capa->lc_flags |= dev->od_capa_alg << 24;
2616 capa->lc_timeout = dev->od_capa_timeout;
2617 capa->lc_expiry = 0;
2619 oc = capa_lookup(dev->od_capa_hash, capa, 1);
2621 LASSERT(!capa_is_expired(oc));
2625 cfs_spin_lock(&capa_lock);
2626 *key = dev->od_capa_keys[1];
2627 cfs_spin_unlock(&capa_lock);
2629 capa->lc_keyid = key->lk_keyid;
2630 capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
2632 rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
2634 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
2635 RETURN(ERR_PTR(rc));
2638 oc = capa_add(dev->od_capa_hash, capa);
2642 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
2645 struct osd_object *obj = osd_dt_obj(dt);
2646 struct inode *inode = obj->oo_inode;
2647 struct osd_thread_info *info = osd_oti_get(env);
2648 struct dentry *dentry = &info->oti_obj_dentry;
2649 struct file *file = &info->oti_file;
2652 dentry->d_inode = inode;
2653 file->f_dentry = dentry;
2654 file->f_mapping = inode->i_mapping;
2655 file->f_op = inode->i_fop;
2656 LOCK_INODE_MUTEX(inode);
2657 rc = file->f_op->fsync(file, dentry, 0);
2658 UNLOCK_INODE_MUTEX(inode);
2662 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
2665 struct osd_object *obj = osd_dt_obj(dt);
2668 *data = (void *)obj->oo_inode;
2676 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
2677 const struct dt_index_features *feat)
2679 struct iam_descr *descr;
2681 if (osd_object_is_root(o))
2682 return feat == &dt_directory_features;
2684 LASSERT(o->oo_dir != NULL);
2686 descr = o->oo_dir->od_container.ic_descr;
2687 if (feat == &dt_directory_features) {
2688 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
2694 feat->dif_keysize_min <= descr->id_key_size &&
2695 descr->id_key_size <= feat->dif_keysize_max &&
2696 feat->dif_recsize_min <= descr->id_rec_size &&
2697 descr->id_rec_size <= feat->dif_recsize_max &&
2698 !(feat->dif_flags & (DT_IND_VARKEY |
2699 DT_IND_VARREC | DT_IND_NONUNQ)) &&
2700 ergo(feat->dif_flags & DT_IND_UPDATE,
2701 1 /* XXX check that object (and file system) is
2706 static int osd_iam_container_init(const struct lu_env *env,
2707 struct osd_object *obj,
2708 struct osd_directory *dir)
2710 struct iam_container *bag = &dir->od_container;
2713 result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
2717 result = iam_container_setup(bag);
2721 if (osd_obj2dev(obj)->od_iop_mode) {
2722 u32 ptr = bag->ic_descr->id_ops->id_root_ptr(bag);
2724 bag->ic_root_bh = ldiskfs_bread(NULL, obj->oo_inode,
2730 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
2732 iam_container_fini(bag);
2739 * Concurrency: no external locking is necessary.
2741 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
2742 const struct dt_index_features *feat)
2746 struct osd_object *obj = osd_dt_obj(dt);
2747 struct osd_device *osd = osd_obj2dev(obj);
2749 LINVRNT(osd_invariant(obj));
2750 LASSERT(dt_object_exists(dt));
2752 if (osd_object_is_root(obj)) {
2753 dt->do_index_ops = &osd_index_ea_ops;
2755 } else if (feat == &dt_directory_features && osd->od_iop_mode) {
2756 dt->do_index_ops = &osd_index_ea_ops;
2757 if (S_ISDIR(obj->oo_inode->i_mode))
2762 } else if (!osd_has_index(obj)) {
2763 struct osd_directory *dir;
2768 cfs_spin_lock(&obj->oo_guard);
2769 if (obj->oo_dir == NULL)
2773 * Concurrent thread allocated container data.
2776 cfs_spin_unlock(&obj->oo_guard);
2778 * Now, that we have container data, serialize its
2781 cfs_down_write(&obj->oo_ext_idx_sem);
2783 * recheck under lock.
2785 if (!osd_has_index(obj))
2786 result = osd_iam_container_init(env, obj, dir);
2789 cfs_up_write(&obj->oo_ext_idx_sem);
2797 if (result == 0 && ea_dir == 0) {
2798 if (!osd_iam_index_probe(env, obj, feat))
2801 LINVRNT(osd_invariant(obj));
2806 static const struct dt_object_operations osd_obj_ops = {
2807 .do_read_lock = osd_object_read_lock,
2808 .do_write_lock = osd_object_write_lock,
2809 .do_read_unlock = osd_object_read_unlock,
2810 .do_write_unlock = osd_object_write_unlock,
2811 .do_write_locked = osd_object_write_locked,
2812 .do_attr_get = osd_attr_get,
2813 .do_declare_attr_set = osd_declare_attr_set,
2814 .do_attr_set = osd_attr_set,
2815 .do_ah_init = osd_ah_init,
2816 .do_declare_create = osd_declare_object_create,
2817 .do_create = osd_object_create,
2818 .do_declare_destroy = osd_declare_object_destroy,
2819 .do_destroy = osd_object_destroy,
2820 .do_index_try = osd_index_try,
2821 .do_declare_ref_add = osd_declare_object_ref_add,
2822 .do_ref_add = osd_object_ref_add,
2823 .do_declare_ref_del = osd_declare_object_ref_del,
2824 .do_ref_del = osd_object_ref_del,
2825 .do_xattr_get = osd_xattr_get,
2826 .do_declare_xattr_set = osd_declare_xattr_set,
2827 .do_xattr_set = osd_xattr_set,
2828 .do_declare_xattr_del = osd_declare_xattr_del,
2829 .do_xattr_del = osd_xattr_del,
2830 .do_xattr_list = osd_xattr_list,
2831 .do_capa_get = osd_capa_get,
2832 .do_object_sync = osd_object_sync,
2833 .do_data_get = osd_data_get,
2837 * dt_object_operations for interoperability mode
2838 * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
2840 static const struct dt_object_operations osd_obj_ea_ops = {
2841 .do_read_lock = osd_object_read_lock,
2842 .do_write_lock = osd_object_write_lock,
2843 .do_read_unlock = osd_object_read_unlock,
2844 .do_write_unlock = osd_object_write_unlock,
2845 .do_write_locked = osd_object_write_locked,
2846 .do_attr_get = osd_attr_get,
2847 .do_declare_attr_set = osd_declare_attr_set,
2848 .do_attr_set = osd_attr_set,
2849 .do_ah_init = osd_ah_init,
2850 .do_declare_create = osd_declare_object_create,
2851 .do_create = osd_object_ea_create,
2852 .do_declare_destroy = osd_declare_object_destroy,
2853 .do_destroy = osd_object_destroy,
2854 .do_index_try = osd_index_try,
2855 .do_declare_ref_add = osd_declare_object_ref_add,
2856 .do_ref_add = osd_object_ref_add,
2857 .do_declare_ref_del = osd_declare_object_ref_del,
2858 .do_ref_del = osd_object_ref_del,
2859 .do_xattr_get = osd_xattr_get,
2860 .do_declare_xattr_set = osd_declare_xattr_set,
2861 .do_xattr_set = osd_xattr_set,
2862 .do_declare_xattr_del = osd_declare_xattr_del,
2863 .do_xattr_del = osd_xattr_del,
2864 .do_xattr_list = osd_xattr_list,
2865 .do_capa_get = osd_capa_get,
2866 .do_object_sync = osd_object_sync,
2867 .do_data_get = osd_data_get,
2875 * XXX: Another layering violation for now.
2877 * We don't want to use ->f_op->read methods, because generic file write
2879 * - serializes on ->i_sem, and
2881 * - does a lot of extra work like balance_dirty_pages(),
2883 * which doesn't work for globally shared files like /last-received.
2885 static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
2887 struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2889 memcpy(buffer, (char*)ei->i_data, buflen);
2894 static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
2897 struct buffer_head *bh;
2898 unsigned long block;
2905 /* prevent reading after eof */
2906 spin_lock(&inode->i_lock);
2907 if (i_size_read(inode) < *offs + size) {
2908 size = i_size_read(inode) - *offs;
2909 spin_unlock(&inode->i_lock);
2911 CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
2912 i_size_read(inode), *offs);
2914 } else if (size == 0) {
2918 spin_unlock(&inode->i_lock);
2921 blocksize = 1 << inode->i_blkbits;
2924 block = *offs >> inode->i_blkbits;
2925 boffs = *offs & (blocksize - 1);
2926 csize = min(blocksize - boffs, size);
2927 bh = ldiskfs_bread(NULL, inode, block, 0, &err);
2929 CERROR("can't read block: %d\n", err);
2933 memcpy(buf, bh->b_data + boffs, csize);
2943 static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
2944 struct lu_buf *buf, loff_t *pos,
2945 struct lustre_capa *capa)
2947 struct osd_object *obj = osd_dt_obj(dt);
2948 struct inode *inode = obj->oo_inode;
2951 if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
2954 /* Read small symlink from inode body as we need to maintain correct
2955 * on-disk symlinks for ldiskfs.
2957 if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
2958 (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
2959 rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
2961 rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
2966 static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
2969 memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
2971 LDISKFS_I(inode)->i_disksize = buflen;
2972 i_size_write(inode, buflen);
2973 inode->i_sb->s_op->dirty_inode(inode);
2978 static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
2979 loff_t *offs, handle_t *handle)
2981 struct buffer_head *bh = NULL;
2982 loff_t offset = *offs;
2983 loff_t new_size = i_size_read(inode);
2984 unsigned long block;
2985 int blocksize = 1 << inode->i_blkbits;
2989 int dirty_inode = 0;
2991 while (bufsize > 0) {
2995 block = offset >> inode->i_blkbits;
2996 boffs = offset & (blocksize - 1);
2997 size = min(blocksize - boffs, bufsize);
2998 bh = ldiskfs_bread(handle, inode, block, 1, &err);
3000 CERROR("can't read/create block: %d\n", err);
3004 err = ldiskfs_journal_get_write_access(handle, bh);
3006 CERROR("journal_get_write_access() returned error %d\n",
3010 LASSERTF(boffs + size <= bh->b_size,
3011 "boffs %d size %d bh->b_size %lu",
3012 boffs, size, (unsigned long)bh->b_size);
3013 memcpy(bh->b_data + boffs, buf, size);
3014 err = ldiskfs_journal_dirty_metadata(handle, bh);
3018 if (offset + size > new_size)
3019 new_size = offset + size;
3027 /* correct in-core and on-disk sizes */
3028 if (new_size > i_size_read(inode)) {
3029 spin_lock(&inode->i_lock);
3030 if (new_size > i_size_read(inode))
3031 i_size_write(inode, new_size);
3032 if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
3033 LDISKFS_I(inode)->i_disksize = i_size_read(inode);
3036 spin_unlock(&inode->i_lock);
3038 inode->i_sb->s_op->dirty_inode(inode);
3046 static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
3047 const loff_t size, loff_t pos,
3048 struct thandle *handle)
3050 struct osd_thandle *oh;
3053 LASSERT(handle != NULL);
3055 oh = container_of0(handle, struct osd_thandle, ot_super);
3056 LASSERT(oh->ot_handle == NULL);
3058 /* XXX: size == 0 or INT_MAX indicating a catalog header update or
3059 * llog write, see comment in mdd_declare_llog_record().
3061 * This hack should be removed in 2.3
3063 if (size == DECLARE_LLOG_REWRITE)
3065 else if (size == DECLARE_LLOG_WRITE)
3068 credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
3070 OSD_DECLARE_OP(oh, write);
3071 oh->ot_credits += credits;
3073 if (osd_dt_obj(dt)->oo_inode == NULL)
3076 osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
3077 osd_dt_obj(dt)->oo_inode);
3078 osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
3079 osd_dt_obj(dt)->oo_inode);
3083 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
3084 const struct lu_buf *buf, loff_t *pos,
3085 struct thandle *handle, struct lustre_capa *capa,
3088 struct osd_object *obj = osd_dt_obj(dt);
3089 struct inode *inode = obj->oo_inode;
3090 struct osd_thandle *oh;
3092 #ifdef HAVE_QUOTA_SUPPORT
3093 cfs_cap_t save = cfs_curproc_cap_pack();
3096 LASSERT(handle != NULL);
3098 if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
3101 oh = container_of(handle, struct osd_thandle, ot_super);
3102 LASSERT(oh->ot_handle->h_transaction != NULL);
3103 #ifdef HAVE_QUOTA_SUPPORT
3105 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3107 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3109 /* Write small symlink to inode body as we need to maintain correct
3110 * on-disk symlinks for ldiskfs.
3112 if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
3113 (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
3114 result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
3116 result = osd_ldiskfs_write_record(inode, buf->lb_buf,
3119 #ifdef HAVE_QUOTA_SUPPORT
3120 cfs_curproc_cap_unpack(save);
3123 result = buf->lb_len;
3128 * in some cases we may need declare methods for objects being created
3129 * e.g., when we create symlink
3131 static const struct dt_body_operations osd_body_ops_new = {
3132 .dbo_declare_write = osd_declare_write,
3135 static const struct dt_body_operations osd_body_ops = {
3136 .dbo_read = osd_read,
3137 .dbo_declare_write = osd_declare_write,
3138 .dbo_write = osd_write
3141 static int osd_index_declare_iam_delete(const struct lu_env *env,
3142 struct dt_object *dt,
3143 const struct dt_key *key,
3144 struct thandle *handle)
3146 struct osd_thandle *oh;
3148 oh = container_of0(handle, struct osd_thandle, ot_super);
3149 LASSERT(oh->ot_handle == NULL);
3151 OSD_DECLARE_OP(oh, delete);
3152 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
3158 * delete a (key, value) pair from index \a dt specified by \a key
3160 * \param dt osd index object
3161 * \param key key for index
3162 * \param rec record reference
3163 * \param handle transaction handler
3166 * \retval -ve failure
3169 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
3170 const struct dt_key *key, struct thandle *handle,
3171 struct lustre_capa *capa)
3173 struct osd_object *obj = osd_dt_obj(dt);
3174 struct osd_thandle *oh;
3175 struct iam_path_descr *ipd;
3176 struct iam_container *bag = &obj->oo_dir->od_container;
3181 LINVRNT(osd_invariant(obj));
3182 LASSERT(dt_object_exists(dt));
3183 LASSERT(bag->ic_object == obj->oo_inode);
3184 LASSERT(handle != NULL);
3186 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3189 OSD_EXEC_OP(handle, delete);
3191 ipd = osd_idx_ipd_get(env, bag);
3192 if (unlikely(ipd == NULL))
3195 oh = container_of0(handle, struct osd_thandle, ot_super);
3196 LASSERT(oh->ot_handle != NULL);
3197 LASSERT(oh->ot_handle->h_transaction != NULL);
3199 rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
3200 osd_ipd_put(env, bag, ipd);
3201 LINVRNT(osd_invariant(obj));
3205 static int osd_index_declare_ea_delete(const struct lu_env *env,
3206 struct dt_object *dt,
3207 const struct dt_key *key,
3208 struct thandle *handle)
3210 struct osd_thandle *oh;
3212 LASSERT(dt_object_exists(dt));
3213 LASSERT(handle != NULL);
3215 oh = container_of0(handle, struct osd_thandle, ot_super);
3216 LASSERT(oh->ot_handle == NULL);
3218 OSD_DECLARE_OP(oh, delete);
3219 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
3221 LASSERT(osd_dt_obj(dt)->oo_inode);
3222 osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
3223 osd_dt_obj(dt)->oo_inode);
3224 osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
3225 osd_dt_obj(dt)->oo_inode);
3230 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
3233 struct osd_fid_pack *rec;
3236 if (de->file_type & LDISKFS_DIRENT_LUFID) {
3237 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
3238 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
3244 * Index delete function for interoperability mode (b11826).
3245 * It will remove the directory entry added by osd_index_ea_insert().
3246 * This entry is needed to maintain name->fid mapping.
3248 * \param key, key i.e. file entry to be deleted
3250 * \retval 0, on success
3251 * \retval -ve, on error
3253 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
3254 const struct dt_key *key, struct thandle *handle,
3255 struct lustre_capa *capa)
3257 struct osd_object *obj = osd_dt_obj(dt);
3258 struct inode *dir = obj->oo_inode;
3259 struct dentry *dentry;
3260 struct osd_thandle *oh;
3261 struct ldiskfs_dir_entry_2 *de;
3262 struct buffer_head *bh;
3263 struct htree_lock *hlock = NULL;
3269 LINVRNT(osd_invariant(obj));
3270 LASSERT(dt_object_exists(dt));
3271 LASSERT(handle != NULL);
3273 OSD_EXEC_OP(handle, delete);
3275 oh = container_of(handle, struct osd_thandle, ot_super);
3276 LASSERT(oh->ot_handle != NULL);
3277 LASSERT(oh->ot_handle->h_transaction != NULL);
3279 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3282 dentry = osd_child_dentry_get(env, obj,
3283 (char *)key, strlen((char *)key));
3285 if (obj->oo_hl_head != NULL) {
3286 hlock = osd_oti_get(env)->oti_hlock;
3287 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3288 dir, LDISKFS_HLOCK_DEL);
3290 cfs_down_write(&obj->oo_ext_idx_sem);
3293 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3295 rc = ldiskfs_delete_entry(oh->ot_handle,
3302 ldiskfs_htree_unlock(hlock);
3304 cfs_up_write(&obj->oo_ext_idx_sem);
3306 LASSERT(osd_invariant(obj));
3311 * Lookup index for \a key and copy record to \a rec.
3313 * \param dt osd index object
3314 * \param key key for index
3315 * \param rec record reference
3317 * \retval +ve success : exact mach
3318 * \retval 0 return record with key not greater than \a key
3319 * \retval -ve failure
3321 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
3322 struct dt_rec *rec, const struct dt_key *key,
3323 struct lustre_capa *capa)
3325 struct osd_object *obj = osd_dt_obj(dt);
3326 struct iam_path_descr *ipd;
3327 struct iam_container *bag = &obj->oo_dir->od_container;
3328 struct osd_thread_info *oti = osd_oti_get(env);
3329 struct iam_iterator *it = &oti->oti_idx_it;
3330 struct iam_rec *iam_rec;
3334 LASSERT(osd_invariant(obj));
3335 LASSERT(dt_object_exists(dt));
3336 LASSERT(bag->ic_object == obj->oo_inode);
3338 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3341 ipd = osd_idx_ipd_get(env, bag);
3345 /* got ipd now we can start iterator. */
3346 iam_it_init(it, bag, 0, ipd);
3348 rc = iam_it_get(it, (struct iam_key *)key);
3350 if (S_ISDIR(obj->oo_inode->i_mode))
3351 iam_rec = (struct iam_rec *)oti->oti_ldp;
3353 iam_rec = (struct iam_rec *) rec;
3355 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
3356 if (S_ISDIR(obj->oo_inode->i_mode))
3357 osd_fid_unpack((struct lu_fid *) rec,
3358 (struct osd_fid_pack *)iam_rec);
3362 osd_ipd_put(env, bag, ipd);
3364 LINVRNT(osd_invariant(obj));
3369 static int osd_index_declare_iam_insert(const struct lu_env *env,
3370 struct dt_object *dt,
3371 const struct dt_rec *rec,
3372 const struct dt_key *key,
3373 struct thandle *handle)
3375 struct osd_thandle *oh;
3377 LASSERT(dt_object_exists(dt));
3378 LASSERT(handle != NULL);
3380 oh = container_of0(handle, struct osd_thandle, ot_super);
3381 LASSERT(oh->ot_handle == NULL);
3383 OSD_DECLARE_OP(oh, insert);
3384 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
3390 * Inserts (key, value) pair in \a dt index object.
3392 * \param dt osd index object
3393 * \param key key for index
3394 * \param rec record reference
3395 * \param th transaction handler
3398 * \retval -ve failure
3400 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
3401 const struct dt_rec *rec, const struct dt_key *key,
3402 struct thandle *th, struct lustre_capa *capa,
3405 struct osd_object *obj = osd_dt_obj(dt);
3406 struct iam_path_descr *ipd;
3407 struct osd_thandle *oh;
3408 struct iam_container *bag = &obj->oo_dir->od_container;
3409 #ifdef HAVE_QUOTA_SUPPORT
3410 cfs_cap_t save = cfs_curproc_cap_pack();
3412 struct osd_thread_info *oti = osd_oti_get(env);
3413 struct iam_rec *iam_rec = (struct iam_rec *)oti->oti_ldp;
3418 LINVRNT(osd_invariant(obj));
3419 LASSERT(dt_object_exists(dt));
3420 LASSERT(bag->ic_object == obj->oo_inode);
3421 LASSERT(th != NULL);
3423 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3426 OSD_EXEC_OP(th, insert);
3428 ipd = osd_idx_ipd_get(env, bag);
3429 if (unlikely(ipd == NULL))
3432 oh = container_of0(th, struct osd_thandle, ot_super);
3433 LASSERT(oh->ot_handle != NULL);
3434 LASSERT(oh->ot_handle->h_transaction != NULL);
3435 #ifdef HAVE_QUOTA_SUPPORT
3437 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3439 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3441 if (S_ISDIR(obj->oo_inode->i_mode))
3442 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
3444 iam_rec = (struct iam_rec *) rec;
3445 rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
3447 #ifdef HAVE_QUOTA_SUPPORT
3448 cfs_curproc_cap_unpack(save);
3450 osd_ipd_put(env, bag, ipd);
3451 LINVRNT(osd_invariant(obj));
3456 * Calls ldiskfs_add_entry() to add directory entry
3457 * into the directory. This is required for
3458 * interoperability mode (b11826)
3460 * \retval 0, on success
3461 * \retval -ve, on error
3463 static int __osd_ea_add_rec(struct osd_thread_info *info,
3464 struct osd_object *pobj,
3465 struct inode *cinode,
3467 const struct dt_rec *fid,
3468 struct htree_lock *hlock,
3471 struct ldiskfs_dentry_param *ldp;
3472 struct dentry *child;
3473 struct osd_thandle *oth;
3476 oth = container_of(th, struct osd_thandle, ot_super);
3477 LASSERT(oth->ot_handle != NULL);
3478 LASSERT(oth->ot_handle->h_transaction != NULL);
3480 child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
3482 if (fid_is_igif((struct lu_fid *)fid) ||
3483 fid_is_norm((struct lu_fid *)fid)) {
3484 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3485 osd_get_ldiskfs_dirent_param(ldp, fid);
3486 child->d_fsdata = (void*) ldp;
3488 child->d_fsdata = NULL;
3489 rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
3495 * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
3496 * into the directory.Also sets flags into osd object to
3497 * indicate dot and dotdot are created. This is required for
3498 * interoperability mode (b11826)
3500 * \param dir directory for dot and dotdot fixup.
3501 * \param obj child object for linking
3503 * \retval 0, on success
3504 * \retval -ve, on error
3506 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3507 struct osd_object *dir,
3508 struct inode *parent_dir, const char *name,
3509 const struct dt_rec *dot_fid,
3510 const struct dt_rec *dot_dot_fid,
3513 struct inode *inode = dir->oo_inode;
3514 struct ldiskfs_dentry_param *dot_ldp;
3515 struct ldiskfs_dentry_param *dot_dot_ldp;
3516 struct osd_thandle *oth;
3519 oth = container_of(th, struct osd_thandle, ot_super);
3520 LASSERT(oth->ot_handle->h_transaction != NULL);
3521 LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3523 if (strcmp(name, dot) == 0) {
3524 if (dir->oo_compat_dot_created) {
3527 LASSERT(inode == parent_dir);
3528 dir->oo_compat_dot_created = 1;
3531 } else if(strcmp(name, dotdot) == 0) {
3532 dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3533 dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
3535 if (!dir->oo_compat_dot_created)
3537 if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
3538 osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
3539 osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
3544 /* in case of rename, dotdot is already created */
3545 if (dir->oo_compat_dotdot_created) {
3546 return __osd_ea_add_rec(info, dir, parent_dir, name,
3547 dot_dot_fid, NULL, th);
3550 result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
3551 inode, dot_ldp, dot_dot_ldp);
3553 dir->oo_compat_dotdot_created = 1;
3561 * It will call the appropriate osd_add* function and return the
3562 * value, return by respective functions.
3564 static int osd_ea_add_rec(const struct lu_env *env,
3565 struct osd_object *pobj,
3566 struct inode *cinode,
3568 const struct dt_rec *fid,
3571 struct osd_thread_info *info = osd_oti_get(env);
3572 struct htree_lock *hlock;
3575 hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3577 if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3579 if (hlock != NULL) {
3580 ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3583 cfs_down_write(&pobj->oo_ext_idx_sem);
3585 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3586 (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3589 if (hlock != NULL) {
3590 ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3591 pobj->oo_inode, LDISKFS_HLOCK_ADD);
3593 cfs_down_write(&pobj->oo_ext_idx_sem);
3596 rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3600 ldiskfs_htree_unlock(hlock);
3602 cfs_up_write(&pobj->oo_ext_idx_sem);
3608 * Calls ->lookup() to find dentry. From dentry get inode and
3609 * read inode's ea to get fid. This is required for interoperability
3612 * \retval 0, on success
3613 * \retval -ve, on error
3615 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
3616 struct dt_rec *rec, const struct dt_key *key)
3618 struct inode *dir = obj->oo_inode;
3619 struct dentry *dentry;
3620 struct ldiskfs_dir_entry_2 *de;
3621 struct buffer_head *bh;
3622 struct lu_fid *fid = (struct lu_fid *) rec;
3623 struct htree_lock *hlock = NULL;
3627 LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
3629 dentry = osd_child_dentry_get(env, obj,
3630 (char *)key, strlen((char *)key));
3632 if (obj->oo_hl_head != NULL) {
3633 hlock = osd_oti_get(env)->oti_hlock;
3634 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3635 dir, LDISKFS_HLOCK_LOOKUP);
3637 cfs_down_read(&obj->oo_ext_idx_sem);
3640 bh = osd_ldiskfs_find_entry(dir, dentry, &de, hlock);
3642 ino = le32_to_cpu(de->inode);
3643 rc = osd_get_fid_from_dentry(de, rec);
3645 /* done with de, release bh */
3648 rc = osd_ea_fid_get(env, obj, ino, fid);
3654 ldiskfs_htree_unlock(hlock);
3656 cfs_up_read(&obj->oo_ext_idx_sem);
3661 * Find the osd object for given fid.
3663 * \param fid need to find the osd object having this fid
3665 * \retval osd_object on success
3666 * \retval -ve on error
3668 struct osd_object *osd_object_find(const struct lu_env *env,
3669 struct dt_object *dt,
3670 const struct lu_fid *fid)
3672 struct lu_device *ludev = dt->do_lu.lo_dev;
3673 struct osd_object *child = NULL;
3674 struct lu_object *luch;
3675 struct lu_object *lo;
3677 luch = lu_object_find(env, ludev, fid, NULL);
3678 if (!IS_ERR(luch)) {
3679 if (lu_object_exists(luch)) {
3680 lo = lu_object_locate(luch->lo_header, ludev->ld_type);
3682 child = osd_obj(lo);
3684 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3685 "lu_object can't be located"
3686 ""DFID"\n", PFID(fid));
3688 if (child == NULL) {
3689 lu_object_put(env, luch);
3690 CERROR("Unable to get osd_object\n");
3691 child = ERR_PTR(-ENOENT);
3694 LU_OBJECT_DEBUG(D_ERROR, env, luch,
3695 "lu_object does not exists "DFID"\n",
3697 child = ERR_PTR(-ENOENT);
3700 child = (void *)luch;
3706 * Put the osd object once done with it.
3708 * \param obj osd object that needs to be put
3710 static inline void osd_object_put(const struct lu_env *env,
3711 struct osd_object *obj)
3713 lu_object_put(env, &obj->oo_dt.do_lu);
3716 static int osd_index_declare_ea_insert(const struct lu_env *env,
3717 struct dt_object *dt,
3718 const struct dt_rec *rec,
3719 const struct dt_key *key,
3720 struct thandle *handle)
3722 struct osd_thandle *oh;
3724 LASSERT(dt_object_exists(dt));
3725 LASSERT(handle != NULL);
3727 oh = container_of0(handle, struct osd_thandle, ot_super);
3728 LASSERT(oh->ot_handle == NULL);
3730 OSD_DECLARE_OP(oh, insert);
3731 oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
3733 LASSERT(osd_dt_obj(dt)->oo_inode);
3734 osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
3735 osd_dt_obj(dt)->oo_inode);
3736 osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
3737 osd_dt_obj(dt)->oo_inode);
3743 * Index add function for interoperability mode (b11826).
3744 * It will add the directory entry.This entry is needed to
3745 * maintain name->fid mapping.
3747 * \param key it is key i.e. file entry to be inserted
3748 * \param rec it is value of given key i.e. fid
3750 * \retval 0, on success
3751 * \retval -ve, on error
3753 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
3754 const struct dt_rec *rec,
3755 const struct dt_key *key, struct thandle *th,
3756 struct lustre_capa *capa, int ignore_quota)
3758 struct osd_object *obj = osd_dt_obj(dt);
3759 struct lu_fid *fid = (struct lu_fid *) rec;
3760 const char *name = (const char *)key;
3761 struct osd_object *child;
3762 #ifdef HAVE_QUOTA_SUPPORT
3763 cfs_cap_t save = cfs_curproc_cap_pack();
3769 LASSERT(osd_invariant(obj));
3770 LASSERT(dt_object_exists(dt));
3771 LASSERT(th != NULL);
3773 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3776 child = osd_object_find(env, dt, fid);
3777 if (!IS_ERR(child)) {
3778 #ifdef HAVE_QUOTA_SUPPORT
3780 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
3782 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
3784 rc = osd_ea_add_rec(env, obj, child->oo_inode, name, rec, th);
3785 #ifdef HAVE_QUOTA_SUPPORT
3786 cfs_curproc_cap_unpack(save);
3788 osd_object_put(env, child);
3790 rc = PTR_ERR(child);
3793 LASSERT(osd_invariant(obj));
3798 * Initialize osd Iterator for given osd index object.
3800 * \param dt osd index object
3803 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
3804 struct dt_object *dt,
3806 struct lustre_capa *capa)
3808 struct osd_it_iam *it;
3809 struct osd_thread_info *oti = osd_oti_get(env);
3810 struct osd_object *obj = osd_dt_obj(dt);
3811 struct lu_object *lo = &dt->do_lu;
3812 struct iam_path_descr *ipd;
3813 struct iam_container *bag = &obj->oo_dir->od_container;
3815 LASSERT(lu_object_exists(lo));
3817 if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
3818 return ERR_PTR(-EACCES);
3821 ipd = osd_it_ipd_get(env, bag);
3822 if (likely(ipd != NULL)) {
3826 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
3827 return (struct dt_it *)it;
3829 return ERR_PTR(-ENOMEM);
3833 * free given Iterator.
3836 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
3838 struct osd_it_iam *it = (struct osd_it_iam *)di;
3839 struct osd_object *obj = it->oi_obj;
3841 iam_it_fini(&it->oi_it);
3842 osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
3843 lu_object_put(env, &obj->oo_dt.do_lu);
3847 * Move Iterator to record specified by \a key
3849 * \param di osd iterator
3850 * \param key key for index
3852 * \retval +ve di points to record with least key not larger than key
3853 * \retval 0 di points to exact matched key
3854 * \retval -ve failure
3857 static int osd_it_iam_get(const struct lu_env *env,
3858 struct dt_it *di, const struct dt_key *key)
3860 struct osd_it_iam *it = (struct osd_it_iam *)di;
3862 return iam_it_get(&it->oi_it, (const struct iam_key *)key);
3868 * \param di osd iterator
3871 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
3873 struct osd_it_iam *it = (struct osd_it_iam *)di;
3875 iam_it_put(&it->oi_it);
3879 * Move iterator by one record
3881 * \param di osd iterator
3883 * \retval +1 end of container reached
3885 * \retval -ve failure
3888 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
3890 struct osd_it_iam *it = (struct osd_it_iam *)di;
3892 return iam_it_next(&it->oi_it);
3896 * Return pointer to the key under iterator.
3899 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
3900 const struct dt_it *di)
3902 struct osd_it_iam *it = (struct osd_it_iam *)di;
3904 return (struct dt_key *)iam_it_key_get(&it->oi_it);
3908 * Return size of key under iterator (in bytes)
3911 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
3913 struct osd_it_iam *it = (struct osd_it_iam *)di;
3915 return iam_it_key_size(&it->oi_it);
3918 static inline void osd_it_append_attrs(struct lu_dirent*ent,
3923 struct luda_type *lt;
3924 const unsigned align = sizeof(struct luda_type) - 1;
3926 /* check if file type is required */
3927 if (attr & LUDA_TYPE) {
3928 len = (len + align) & ~align;
3930 lt = (void *) ent->lde_name + len;
3931 lt->lt_type = cpu_to_le16(CFS_DTTOIF(type));
3932 ent->lde_attrs |= LUDA_TYPE;
3935 ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
3939 * build lu direct from backend fs dirent.
3942 static inline void osd_it_pack_dirent(struct lu_dirent *ent,
3950 fid_cpu_to_le(&ent->lde_fid, fid);
3951 ent->lde_attrs = LUDA_FID;
3953 ent->lde_hash = cpu_to_le64(offset);
3954 ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
3956 strncpy(ent->lde_name, name, namelen);
3957 ent->lde_namelen = cpu_to_le16(namelen);
3959 /* append lustre attributes */
3960 osd_it_append_attrs(ent, attr, namelen, type);
3964 * Return pointer to the record under iterator.
3966 static int osd_it_iam_rec(const struct lu_env *env,
3967 const struct dt_it *di,
3968 struct dt_rec *dtrec,
3971 struct osd_it_iam *it = (struct osd_it_iam *)di;
3972 struct osd_thread_info *info = osd_oti_get(env);
3973 struct lu_fid *fid = &info->oti_fid;
3974 const struct osd_fid_pack *rec;
3975 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
3981 name = (char *)iam_it_key_get(&it->oi_it);
3983 RETURN(PTR_ERR(name));
3985 namelen = iam_it_key_size(&it->oi_it);
3987 rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
3989 RETURN(PTR_ERR(rec));
3991 rc = osd_fid_unpack(fid, rec);
3995 hash = iam_it_store(&it->oi_it);
3997 /* IAM does not store object type in IAM index (dir) */
3998 osd_it_pack_dirent(lde, fid, hash, name, namelen,
4005 * Returns cookie for current Iterator position.
4007 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
4009 struct osd_it_iam *it = (struct osd_it_iam *)di;
4011 return iam_it_store(&it->oi_it);
4015 * Restore iterator from cookie.
4017 * \param di osd iterator
4018 * \param hash Iterator location cookie
4020 * \retval +ve di points to record with least key not larger than key.
4021 * \retval 0 di points to exact matched key
4022 * \retval -ve failure
4025 static int osd_it_iam_load(const struct lu_env *env,
4026 const struct dt_it *di, __u64 hash)
4028 struct osd_it_iam *it = (struct osd_it_iam *)di;
4030 return iam_it_load(&it->oi_it, hash);
4033 static const struct dt_index_operations osd_index_iam_ops = {
4034 .dio_lookup = osd_index_iam_lookup,
4035 .dio_declare_insert = osd_index_declare_iam_insert,
4036 .dio_insert = osd_index_iam_insert,
4037 .dio_declare_delete = osd_index_declare_iam_delete,
4038 .dio_delete = osd_index_iam_delete,
4040 .init = osd_it_iam_init,
4041 .fini = osd_it_iam_fini,
4042 .get = osd_it_iam_get,
4043 .put = osd_it_iam_put,
4044 .next = osd_it_iam_next,
4045 .key = osd_it_iam_key,
4046 .key_size = osd_it_iam_key_size,
4047 .rec = osd_it_iam_rec,
4048 .store = osd_it_iam_store,
4049 .load = osd_it_iam_load
4054 * Creates or initializes iterator context.
4056 * \retval struct osd_it_ea, iterator structure on success
4059 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
4060 struct dt_object *dt,
4062 struct lustre_capa *capa)
4064 struct osd_object *obj = osd_dt_obj(dt);
4065 struct osd_thread_info *info = osd_oti_get(env);
4066 struct osd_it_ea *it = &info->oti_it_ea;
4067 struct lu_object *lo = &dt->do_lu;
4068 struct dentry *obj_dentry = &info->oti_it_dentry;
4070 LASSERT(lu_object_exists(lo));
4072 obj_dentry->d_inode = obj->oo_inode;
4073 obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
4074 obj_dentry->d_name.hash = 0;
4076 it->oie_rd_dirent = 0;
4077 it->oie_it_dirent = 0;
4078 it->oie_dirent = NULL;
4079 it->oie_buf = info->oti_it_ea_buf;
4081 it->oie_file.f_pos = 0;
4082 it->oie_file.f_dentry = obj_dentry;
4083 if (attr & LUDA_64BITHASH)
4084 it->oie_file.f_flags = O_64BITHASH;
4086 it->oie_file.f_flags = O_32BITHASH;
4087 it->oie_file.f_mapping = obj->oo_inode->i_mapping;
4088 it->oie_file.f_op = obj->oo_inode->i_fop;
4089 it->oie_file.private_data = NULL;
4091 RETURN((struct dt_it *) it);
4095 * Destroy or finishes iterator context.
4097 * \param di iterator structure to be destroyed
4099 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
4101 struct osd_it_ea *it = (struct osd_it_ea *)di;
4102 struct osd_object *obj = it->oie_obj;
4103 struct inode *inode = obj->oo_inode;
4106 it->oie_file.f_op->release(inode, &it->oie_file);
4107 lu_object_put(env, &obj->oo_dt.do_lu);
4112 * It position the iterator at given key, so that next lookup continues from
4113 * that key Or it is similar to dio_it->load() but based on a key,
4114 * rather than file position.
4116 * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
4119 * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
4121 static int osd_it_ea_get(const struct lu_env *env,
4122 struct dt_it *di, const struct dt_key *key)
4124 struct osd_it_ea *it = (struct osd_it_ea *)di;
4127 LASSERT(((const char *)key)[0] == '\0');
4128 it->oie_file.f_pos = 0;
4129 it->oie_rd_dirent = 0;
4130 it->oie_it_dirent = 0;
4131 it->oie_dirent = NULL;
4139 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
4144 * It is called internally by ->readdir(). It fills the
4145 * iterator's in-memory data structure with required
4146 * information i.e. name, namelen, rec_size etc.
4148 * \param buf in which information to be filled in.
4149 * \param name name of the file in given dir
4151 * \retval 0 on success
4152 * \retval 1 on buffer full
4154 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
4155 loff_t offset, __u64 ino,
4158 struct osd_it_ea *it = (struct osd_it_ea *)buf;
4159 struct osd_it_ea_dirent *ent = it->oie_dirent;
4160 struct lu_fid *fid = &ent->oied_fid;
4161 struct osd_fid_pack *rec;
4164 /* this should never happen */
4165 if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
4166 CERROR("ldiskfs return invalid namelen %d\n", namelen);
4170 if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
4174 if (d_type & LDISKFS_DIRENT_LUFID) {
4175 rec = (struct osd_fid_pack*) (name + namelen + 1);
4177 if (osd_fid_unpack(fid, rec) != 0)
4180 d_type &= ~LDISKFS_DIRENT_LUFID;
4185 ent->oied_ino = ino;
4186 ent->oied_off = offset;
4187 ent->oied_namelen = namelen;
4188 ent->oied_type = d_type;
4190 memcpy(ent->oied_name, name, namelen);
4192 it->oie_rd_dirent++;
4193 it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
4198 * Calls ->readdir() to load a directory entry at a time
4199 * and stored it in iterator's in-memory data structure.
4201 * \param di iterator's in memory structure
4203 * \retval 0 on success
4204 * \retval -ve on error
4206 static int osd_ldiskfs_it_fill(const struct lu_env *env,
4207 const struct dt_it *di)
4209 struct osd_it_ea *it = (struct osd_it_ea *)di;
4210 struct osd_object *obj = it->oie_obj;
4211 struct inode *inode = obj->oo_inode;
4212 struct htree_lock *hlock = NULL;
4216 it->oie_dirent = it->oie_buf;
4217 it->oie_rd_dirent = 0;
4219 if (obj->oo_hl_head != NULL) {
4220 hlock = osd_oti_get(env)->oti_hlock;
4221 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4222 inode, LDISKFS_HLOCK_READDIR);
4224 cfs_down_read(&obj->oo_ext_idx_sem);
4227 result = inode->i_fop->readdir(&it->oie_file, it,
4228 (filldir_t) osd_ldiskfs_filldir);
4231 ldiskfs_htree_unlock(hlock);
4233 cfs_up_read(&obj->oo_ext_idx_sem);
4235 if (it->oie_rd_dirent == 0) {
4238 it->oie_dirent = it->oie_buf;
4239 it->oie_it_dirent = 1;
4246 * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4247 * to load a directory entry at a time and stored it in
4248 * iterator's in-memory data structure.
4250 * \param di iterator's in memory structure
4252 * \retval +ve iterator reached to end
4253 * \retval 0 iterator not reached to end
4254 * \retval -ve on error
4256 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
4258 struct osd_it_ea *it = (struct osd_it_ea *)di;
4263 if (it->oie_it_dirent < it->oie_rd_dirent) {
4265 (void *) it->oie_dirent +
4266 cfs_size_round(sizeof(struct osd_it_ea_dirent) +
4267 it->oie_dirent->oied_namelen);
4268 it->oie_it_dirent++;
4271 if (it->oie_file.f_pos == LDISKFS_HTREE_EOF)
4274 rc = osd_ldiskfs_it_fill(env, di);
4281 * Returns the key at current position from iterator's in memory structure.
4283 * \param di iterator's in memory structure
4285 * \retval key i.e. struct dt_key on success
4287 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
4288 const struct dt_it *di)
4290 struct osd_it_ea *it = (struct osd_it_ea *)di;
4292 RETURN((struct dt_key *)it->oie_dirent->oied_name);
4296 * Returns the key's size at current position from iterator's in memory structure.
4298 * \param di iterator's in memory structure
4300 * \retval key_size i.e. struct dt_key on success
4302 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
4304 struct osd_it_ea *it = (struct osd_it_ea *)di;
4306 RETURN(it->oie_dirent->oied_namelen);
4311 * Returns the value (i.e. fid/igif) at current position from iterator's
4312 * in memory structure.
4314 * \param di struct osd_it_ea, iterator's in memory structure
4315 * \param attr attr requested for dirent.
4316 * \param lde lustre dirent
4318 * \retval 0 no error and \param lde has correct lustre dirent.
4319 * \retval -ve on error
4321 static inline int osd_it_ea_rec(const struct lu_env *env,
4322 const struct dt_it *di,
4323 struct dt_rec *dtrec,
4326 struct osd_it_ea *it = (struct osd_it_ea *)di;
4327 struct osd_object *obj = it->oie_obj;
4328 struct lu_fid *fid = &it->oie_dirent->oied_fid;
4329 struct lu_dirent *lde = (struct lu_dirent *)dtrec;
4334 if (!fid_is_sane(fid))
4335 rc = osd_ea_fid_get(env, obj, it->oie_dirent->oied_ino, fid);
4338 osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
4339 it->oie_dirent->oied_name,
4340 it->oie_dirent->oied_namelen,
4341 it->oie_dirent->oied_type,
4347 * Returns a cookie for current position of the iterator head, so that
4348 * user can use this cookie to load/start the iterator next time.
4350 * \param di iterator's in memory structure
4352 * \retval cookie for current position, on success
4354 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
4356 struct osd_it_ea *it = (struct osd_it_ea *)di;
4358 RETURN(it->oie_dirent->oied_off);
4362 * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4363 * to load a directory entry at a time and stored it i inn,
4364 * in iterator's in-memory data structure.
4366 * \param di struct osd_it_ea, iterator's in memory structure
4368 * \retval +ve on success
4369 * \retval -ve on error
4371 static int osd_it_ea_load(const struct lu_env *env,
4372 const struct dt_it *di, __u64 hash)
4374 struct osd_it_ea *it = (struct osd_it_ea *)di;
4378 it->oie_file.f_pos = hash;
4380 rc = osd_ldiskfs_it_fill(env, di);
4388 * Index lookup function for interoperability mode (b11826).
4390 * \param key, key i.e. file name to be searched
4392 * \retval +ve, on success
4393 * \retval -ve, on error
4395 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
4396 struct dt_rec *rec, const struct dt_key *key,
4397 struct lustre_capa *capa)
4399 struct osd_object *obj = osd_dt_obj(dt);
4404 LASSERT(S_ISDIR(obj->oo_inode->i_mode));
4405 LINVRNT(osd_invariant(obj));
4407 if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
4410 rc = osd_ea_lookup_rec(env, obj, rec, key);
4418 * Index and Iterator operations for interoperability
4419 * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
4421 static const struct dt_index_operations osd_index_ea_ops = {
4422 .dio_lookup = osd_index_ea_lookup,
4423 .dio_declare_insert = osd_index_declare_ea_insert,
4424 .dio_insert = osd_index_ea_insert,
4425 .dio_declare_delete = osd_index_declare_ea_delete,
4426 .dio_delete = osd_index_ea_delete,
4428 .init = osd_it_ea_init,
4429 .fini = osd_it_ea_fini,
4430 .get = osd_it_ea_get,
4431 .put = osd_it_ea_put,
4432 .next = osd_it_ea_next,
4433 .key = osd_it_ea_key,
4434 .key_size = osd_it_ea_key_size,
4435 .rec = osd_it_ea_rec,
4436 .store = osd_it_ea_store,
4437 .load = osd_it_ea_load
4441 static void *osd_key_init(const struct lu_context *ctx,
4442 struct lu_context_key *key)
4444 struct osd_thread_info *info;
4446 OBD_ALLOC_PTR(info);
4448 return ERR_PTR(-ENOMEM);
4450 OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4451 if (info->oti_it_ea_buf == NULL)
4454 info->oti_env = container_of(ctx, struct lu_env, le_ctx);
4456 info->oti_hlock = ldiskfs_htree_lock_alloc();
4457 if (info->oti_hlock == NULL)
4463 OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4466 return ERR_PTR(-ENOMEM);
4469 static void osd_key_fini(const struct lu_context *ctx,
4470 struct lu_context_key *key, void* data)
4472 struct osd_thread_info *info = data;
4474 if (info->oti_hlock != NULL)
4475 ldiskfs_htree_lock_free(info->oti_hlock);
4476 OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
4480 static void osd_key_exit(const struct lu_context *ctx,
4481 struct lu_context_key *key, void *data)
4483 struct osd_thread_info *info = data;
4485 LASSERT(info->oti_r_locks == 0);
4486 LASSERT(info->oti_w_locks == 0);
4487 LASSERT(info->oti_txns == 0);
4490 /* type constructor/destructor: osd_type_init, osd_type_fini */
4491 LU_TYPE_INIT_FINI(osd, &osd_key);
4493 static struct lu_context_key osd_key = {
4494 .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
4495 .lct_init = osd_key_init,
4496 .lct_fini = osd_key_fini,
4497 .lct_exit = osd_key_exit
4501 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
4502 const char *name, struct lu_device *next)
4504 return osd_procfs_init(osd_dev(d), name);
4507 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
4509 struct osd_thread_info *info = osd_oti_get(env);
4511 if (o->od_obj_area != NULL) {
4512 lu_object_put(env, &o->od_obj_area->do_lu);
4513 o->od_obj_area = NULL;
4515 if (o->od_oi_table != NULL)
4516 osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
4521 static int osd_mount(const struct lu_env *env,
4522 struct osd_device *o, struct lustre_cfg *cfg)
4524 struct lustre_mount_info *lmi;
4525 const char *dev = lustre_cfg_string(cfg, 0);
4526 struct lustre_disk_data *ldd;
4527 struct lustre_sb_info *lsi;
4530 if (o->od_mount != NULL) {
4531 CERROR("Already mounted (%s)\n", dev);
4536 lmi = server_get_mount(dev);
4538 CERROR("Cannot get mount info for %s!\n", dev);
4542 LASSERT(lmi != NULL);
4543 /* save lustre_mount_info in dt_device */
4546 lsi = s2lsi(lmi->lmi_sb);
4549 if (ldd->ldd_flags & LDD_F_IAM_DIR) {
4551 LCONSOLE_WARN("OSD: IAM mode enabled\n");
4555 o->od_obj_area = NULL;
4559 static struct lu_device *osd_device_fini(const struct lu_env *env,
4560 struct lu_device *d)
4565 shrink_dcache_sb(osd_sb(osd_dev(d)));
4566 osd_sync(env, lu2dt_dev(d));
4568 rc = osd_procfs_fini(osd_dev(d));
4570 CERROR("proc fini error %d \n", rc);
4571 RETURN (ERR_PTR(rc));
4574 if (osd_dev(d)->od_mount)
4575 server_put_mount(osd_dev(d)->od_mount->lmi_name,
4576 osd_dev(d)->od_mount->lmi_mnt);
4577 osd_dev(d)->od_mount = NULL;
4582 static struct lu_device *osd_device_alloc(const struct lu_env *env,
4583 struct lu_device_type *t,
4584 struct lustre_cfg *cfg)
4586 struct lu_device *l;
4587 struct osd_device *o;
4593 result = dt_device_init(&o->od_dt_dev, t);
4596 l->ld_ops = &osd_lu_ops;
4597 o->od_dt_dev.dd_ops = &osd_dt_ops;
4598 cfs_spin_lock_init(&o->od_osfs_lock);
4599 o->od_osfs_age = cfs_time_shift_64(-1000);
4600 o->od_capa_hash = init_capa_hash();
4601 if (o->od_capa_hash == NULL) {
4602 dt_device_fini(&o->od_dt_dev);
4603 l = ERR_PTR(-ENOMEM);
4606 l = ERR_PTR(result);
4611 l = ERR_PTR(-ENOMEM);
4615 static struct lu_device *osd_device_free(const struct lu_env *env,
4616 struct lu_device *d)
4618 struct osd_device *o = osd_dev(d);
4621 cleanup_capa_hash(o->od_capa_hash);
4622 dt_device_fini(&o->od_dt_dev);
4627 static int osd_process_config(const struct lu_env *env,
4628 struct lu_device *d, struct lustre_cfg *cfg)
4630 struct osd_device *o = osd_dev(d);
4634 switch(cfg->lcfg_command) {
4636 err = osd_mount(env, o, cfg);
4639 err = osd_shutdown(env, o);
4648 static int osd_recovery_complete(const struct lu_env *env,
4649 struct lu_device *d)
4654 static int osd_prepare(const struct lu_env *env,
4655 struct lu_device *pdev,
4656 struct lu_device *dev)
4658 struct osd_device *osd = osd_dev(dev);
4659 struct lustre_sb_info *lsi;
4660 struct lustre_disk_data *ldd;
4661 struct lustre_mount_info *lmi;
4662 struct osd_thread_info *oti = osd_oti_get(env);
4663 struct dt_object *d;
4667 /* 1. initialize oi before any file create or file open */
4668 result = osd_oi_init(oti, &osd->od_oi_table,
4669 &osd->od_dt_dev, lu2md_dev(pdev));
4673 LASSERT(result > 0);
4674 osd->od_oi_count = result;
4676 lmi = osd->od_mount;
4677 lsi = s2lsi(lmi->lmi_sb);
4680 /* 2. setup local objects */
4681 result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
4685 /* 3. open remote object dir */
4686 d = dt_store_open(env, lu2dt_dev(dev), "",
4687 remote_obj_dir, &oti->oti_fid);
4689 osd->od_obj_area = d;
4692 result = PTR_ERR(d);
4693 osd->od_obj_area = NULL;
4700 static const struct lu_object_operations osd_lu_obj_ops = {
4701 .loo_object_init = osd_object_init,
4702 .loo_object_delete = osd_object_delete,
4703 .loo_object_release = osd_object_release,
4704 .loo_object_free = osd_object_free,
4705 .loo_object_print = osd_object_print,
4706 .loo_object_invariant = osd_object_invariant
4709 static const struct lu_device_operations osd_lu_ops = {
4710 .ldo_object_alloc = osd_object_alloc,
4711 .ldo_process_config = osd_process_config,
4712 .ldo_recovery_complete = osd_recovery_complete,
4713 .ldo_prepare = osd_prepare,
4716 static const struct lu_device_type_operations osd_device_type_ops = {
4717 .ldto_init = osd_type_init,
4718 .ldto_fini = osd_type_fini,
4720 .ldto_start = osd_type_start,
4721 .ldto_stop = osd_type_stop,
4723 .ldto_device_alloc = osd_device_alloc,
4724 .ldto_device_free = osd_device_free,
4726 .ldto_device_init = osd_device_init,
4727 .ldto_device_fini = osd_device_fini
4730 static struct lu_device_type osd_device_type = {
4731 .ldt_tags = LU_DEVICE_DT,
4732 .ldt_name = LUSTRE_OSD_NAME,
4733 .ldt_ops = &osd_device_type_ops,
4734 .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
4738 * lprocfs legacy support.
4740 static struct obd_ops osd_obd_device_ops = {
4741 .o_owner = THIS_MODULE
4744 static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
4745 .llod_name = remote_obj_dir,
4746 .llod_oid = OSD_REM_OBJ_DIR_OID,
4748 .llod_feat = &dt_directory_features,
4751 static int __init osd_mod_init(void)
4753 struct lprocfs_static_vars lvars;
4756 llo_local_obj_register(&llod_osd_rem_obj_dir);
4757 lprocfs_osd_init_vars(&lvars);
4758 return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
4759 LUSTRE_OSD_NAME, &osd_device_type);
4762 static void __exit osd_mod_exit(void)
4764 llo_local_obj_unregister(&llod_osd_rem_obj_dir);
4765 class_unregister_type(LUSTRE_OSD_NAME);
4768 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4769 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
4770 MODULE_LICENSE("GPL");
4772 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);