1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/osd/osd_handler.c
5 * Top-level entry points into osd module
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Nikita Danilov <nikita@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
36 /* LUSTRE_VERSION_CODE */
37 #include <lustre_ver.h>
38 /* prerequisite for linux/xattr.h */
39 #include <linux/types.h>
40 /* prerequisite for linux/xattr.h */
42 /* XATTR_{REPLACE,CREATE} */
43 #include <linux/xattr.h>
45 * XXX temporary stuff: direct access to ldiskfs/jdb. Interface between osd
46 * and file system is not yet specified.
48 /* handle_t, journal_start(), journal_stop() */
49 #include <linux/jbd.h>
51 #include <linux/ldiskfs_fs.h>
52 #include <linux/ldiskfs_jbd.h>
57 * struct OBD_{ALLOC,FREE}*()
60 #include <obd_support.h>
61 /* struct ptlrpc_thread */
62 #include <lustre_net.h>
63 /* LUSTRE_OSD0_NAME */
65 /* class_register_type(), class_unregister_type(), class_get_type() */
66 #include <obd_class.h>
67 #include <lustre_disk.h>
70 #include <lustre_fid.h>
71 #include <linux/lustre_iam.h>
73 #include "osd_internal.h"
77 struct dt_object oo_dt;
79 * Inode for file system object represented by this osd_object. This
80 * inode is pinned for the whole duration of lu_object life.
82 struct inode *oo_inode;
83 struct rw_semaphore oo_sem;
84 struct iam_container oo_container;
85 struct iam_descr oo_descr;
86 struct iam_path_descr *oo_ipd;
87 const struct lu_context *oo_owner;
95 struct dt_device od_dt_dev;
96 /* information about underlying file system */
97 struct lustre_mount_info *od_mount;
101 * XXX temporary stuff for object index: directory where every object
102 * is named by its fid.
104 struct dentry *od_obj_area;
106 /* Thread context for transaction commit callback.
107 * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
108 * is serialized, that is there is no more than one transaction commit
109 * at a time (JBD journal_commit_transaction() is serialized).
110 * This means that it's enough to have _one_ lu_context.
112 struct lu_context od_ctx_for_commit;
115 static int osd_root_get (const struct lu_context *ctxt,
116 struct dt_device *dev, struct lu_fid *f);
117 static int osd_statfs (const struct lu_context *ctxt,
118 struct dt_device *dev, struct kstatfs *sfs);
120 static int lu_device_is_osd (const struct lu_device *d);
121 static void osd_mod_exit (void) __exit;
122 static int osd_mod_init (void) __init;
123 static int osd_type_init (struct lu_device_type *t);
124 static void osd_type_fini (struct lu_device_type *t);
125 static int osd_object_init (const struct lu_context *ctxt,
126 struct lu_object *l);
127 static void osd_object_release(const struct lu_context *ctxt,
128 struct lu_object *l);
129 static int osd_object_print (const struct lu_context *ctx, void *cookie,
130 lu_printer_t p, const struct lu_object *o);
131 static void osd_device_free (const struct lu_context *ctx,
132 struct lu_device *m);
133 static void *osd_key_init (const struct lu_context *ctx,
134 struct lu_context_key *key);
135 static void osd_key_fini (const struct lu_context *ctx,
136 struct lu_context_key *key, void *data);
137 static void osd_key_exit (const struct lu_context *ctx,
138 struct lu_context_key *key, void *data);
139 static int osd_has_index (const struct osd_object *obj);
140 static void osd_object_init0 (struct osd_object *obj);
141 static int osd_device_init (const struct lu_context *ctx,
142 struct lu_device *d, struct lu_device *);
143 static int osd_fid_lookup (const struct lu_context *ctx,
144 struct osd_object *obj,
145 const struct lu_fid *fid);
146 static int osd_inode_getattr (const struct lu_context *ctx,
147 struct inode *inode, struct lu_attr *attr);
148 static int osd_inode_setattr (const struct lu_context *ctx,
149 struct inode *inode, const struct lu_attr *attr);
150 static int osd_param_is_sane (const struct osd_device *dev,
151 const struct txn_param *param);
152 static int osd_index_lookup (const struct lu_context *ctxt,
153 struct dt_object *dt,
154 struct dt_rec *rec, const struct dt_key *key);
155 static int osd_index_insert (const struct lu_context *ctxt,
156 struct dt_object *dt,
157 const struct dt_rec *rec,
158 const struct dt_key *key,
159 struct thandle *handle);
160 static int osd_index_delete (const struct lu_context *ctxt,
161 struct dt_object *dt, const struct dt_key *key,
162 struct thandle *handle);
163 static int osd_index_probe (const struct lu_context *ctxt,
164 struct osd_object *o,
165 const struct dt_index_features *feat);
166 static int osd_index_try (const struct lu_context *ctx,
167 struct dt_object *dt,
168 const struct dt_index_features *feat);
169 static void osd_index_fini (struct osd_object *o);
171 static void osd_it_fini (const struct lu_context *ctx, struct dt_it *di);
172 static int osd_it_get (const struct lu_context *ctx,
173 struct dt_it *di, const struct dt_key *key);
174 static void osd_it_put (const struct lu_context *ctx, struct dt_it *di);
175 static int osd_it_next (const struct lu_context *ctx, struct dt_it *di);
176 static int osd_it_key_size (const struct lu_context *ctx,
177 const struct dt_it *di);
178 static void osd_conf_get (const struct lu_context *ctx,
179 const struct dt_device *dev,
180 struct dt_device_param *param);
181 static int osd_read_locked (const struct lu_context *ctx,
182 struct osd_object *o);
183 static int osd_write_locked (const struct lu_context *ctx,
184 struct osd_object *o);
186 static struct osd_object *osd_obj (const struct lu_object *o);
187 static struct osd_device *osd_dev (const struct lu_device *d);
188 static struct osd_device *osd_dt_dev (const struct dt_device *d);
189 static struct osd_object *osd_dt_obj (const struct dt_object *d);
190 static struct osd_device *osd_obj2dev (const struct osd_object *o);
191 static struct lu_device *osd2lu_dev (struct osd_device *osd);
192 static struct lu_device *osd_device_fini (const struct lu_context *ctx,
193 struct lu_device *d);
194 static struct lu_device *osd_device_alloc (const struct lu_context *ctx,
195 struct lu_device_type *t,
196 struct lustre_cfg *cfg);
197 static struct lu_object *osd_object_alloc (const struct lu_context *ctx,
198 const struct lu_object_header *hdr,
199 struct lu_device *d);
200 static struct inode *osd_iget (struct osd_thread_info *info,
201 struct osd_device *dev,
202 const struct osd_inode_id *id);
203 static struct super_block *osd_sb (const struct osd_device *dev);
204 static struct dt_it *osd_it_init (const struct lu_context *ctx,
205 struct dt_object *dt);
206 static struct dt_key *osd_it_key (const struct lu_context *ctx,
207 const struct dt_it *di);
208 static struct dt_rec *osd_it_rec (const struct lu_context *ctx,
209 const struct dt_it *di);
210 static struct timespec *osd_inode_time (const struct lu_context *ctx,
213 static journal_t *osd_journal (const struct osd_device *dev);
215 static struct lu_device_type_operations osd_device_type_ops;
216 static struct lu_device_type osd_device_type;
217 static struct lu_object_operations osd_lu_obj_ops;
218 static struct obd_ops osd_obd_device_ops;
219 static struct lprocfs_vars lprocfs_osd_module_vars[];
220 static struct lprocfs_vars lprocfs_osd_obd_vars[];
221 static struct lu_device_operations osd_lu_ops;
222 static struct lu_context_key osd_key;
223 static struct dt_object_operations osd_obj_ops;
224 static struct dt_body_operations osd_body_ops;
225 static struct dt_index_operations osd_index_ops;
226 static struct dt_index_operations osd_index_compat_ops;
229 struct thandle ot_super;
231 struct journal_callback ot_jcb;
235 * Invariants, assertions.
238 #define OSD_INVARIANT_CHECKS (0)
240 #if OSD_INVARIANT_CHECKS
241 static int osd_invariant(const struct osd_object *obj)
245 ergo(obj->oo_inode != NULL,
246 obj->oo_inode->i_sb == osd_sb(osd_obj2dev(obj)) &&
247 atomic_read(&obj->oo_inode->i_count) > 0) &&
248 ergo(obj->oo_container.ic_object != NULL,
249 obj->oo_container.ic_object == obj->oo_inode);
252 #define osd_invariant(obj) (1)
255 static int osd_read_locked(const struct lu_context *ctx, struct osd_object *o)
257 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
259 return oti->oti_r_locks > 0;
262 static int osd_write_locked(const struct lu_context *ctx, struct osd_object *o)
264 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
266 return oti->oti_w_locks > 0 && o->oo_owner == ctx;
269 /* helper to push us into KERNEL_DS context */
270 static struct file *osd_rw_init(const struct lu_context *ctxt,
271 struct inode *inode, mm_segment_t *seg)
273 struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
274 struct dentry *dentry = &info->oti_dentry;
275 struct file *file = &info->oti_file;
277 file->f_dentry = dentry;
278 file->f_mapping = inode->i_mapping;
279 file->f_op = inode->i_fop;
280 file->f_mode = FMODE_WRITE|FMODE_READ;
281 dentry->d_inode = inode;
288 /* helper to pop us from KERNEL_DS context */
289 static void osd_rw_fini(mm_segment_t *seg)
294 static int osd_root_get(const struct lu_context *ctx,
295 struct dt_device *dev, struct lu_fid *f)
299 inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
300 lu_igif_build(f, inode->i_ino, inode->i_generation);
305 * OSD object methods.
308 static struct lu_object *osd_object_alloc(const struct lu_context *ctx,
309 const struct lu_object_header *hdr,
312 struct osd_object *mo;
318 l = &mo->oo_dt.do_lu;
319 dt_object_init(&mo->oo_dt, NULL, d);
320 mo->oo_dt.do_ops = &osd_obj_ops;
321 l->lo_ops = &osd_lu_obj_ops;
322 init_rwsem(&mo->oo_sem);
328 static void osd_object_init0(struct osd_object *obj)
330 LASSERT(obj->oo_inode != NULL);
331 obj->oo_dt.do_body_ops = &osd_body_ops;
332 obj->oo_dt.do_lu.lo_header->loh_attr |=
333 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
336 static int osd_object_init(const struct lu_context *ctxt, struct lu_object *l)
338 struct osd_object *obj = osd_obj(l);
341 LASSERT(osd_invariant(obj));
343 result = osd_fid_lookup(ctxt, obj, lu_object_fid(l));
345 if (obj->oo_inode != NULL)
346 osd_object_init0(obj);
348 LASSERT(osd_invariant(obj));
352 static void osd_object_free(const struct lu_context *ctx, struct lu_object *l)
354 struct osd_object *obj = osd_obj(l);
356 LASSERT(osd_invariant(obj));
358 dt_object_fini(&obj->oo_dt);
362 static void osd_index_fini(struct osd_object *o)
364 struct iam_container *bag;
366 bag = &o->oo_container;
367 if (o->oo_ipd != NULL) {
368 LASSERT(bag->ic_descr->id_ops->id_ipd_free != NULL);
369 bag->ic_descr->id_ops->id_ipd_free(&o->oo_container, o->oo_ipd);
371 if (o->oo_inode != NULL) {
372 if (o->oo_container.ic_object == o->oo_inode)
373 iam_container_fini(&o->oo_container);
377 static void osd_object_delete(const struct lu_context *ctx, struct lu_object *l)
379 struct osd_object *obj = osd_obj(l);
381 LASSERT(osd_invariant(obj));
384 if (obj->oo_inode != NULL) {
386 obj->oo_inode = NULL;
390 static int osd_inode_unlinked(const struct inode *inode)
392 return inode->i_nlink == !!S_ISDIR(inode->i_mode);
395 static void osd_object_release(const struct lu_context *ctxt,
398 struct osd_object *o = osd_obj(l);
400 LASSERT(!lu_object_is_dying(l->lo_header));
401 if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
402 set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
405 static int osd_object_print(const struct lu_context *ctx, void *cookie,
406 lu_printer_t p, const struct lu_object *l)
408 struct osd_object *o = osd_obj(l);
411 d = o->oo_container.ic_descr;
412 return (*p)(ctx, cookie, LUSTRE_OSD0_NAME"-object@%p(i:%p:%lu/%u)[%s]",
414 o->oo_inode ? o->oo_inode->i_ino : 0UL,
415 o->oo_inode ? o->oo_inode->i_generation : 0,
416 d ? d->id_ops->id_name : "plain");
419 static int osd_statfs(const struct lu_context *ctx,
420 struct dt_device *d, struct kstatfs *sfs)
422 struct osd_device *osd = osd_dt_dev(d);
423 struct super_block *sb = osd_sb(osd);
428 memset(sfs, 0, sizeof *sfs);
429 result = sb->s_op->statfs(sb, sfs);
434 static void osd_conf_get(const struct lu_context *ctx,
435 const struct dt_device *dev,
436 struct dt_device_param *param)
439 * XXX should be taken from not-yet-existing fs abstraction layer.
441 param->ddp_max_name_len = LDISKFS_NAME_LEN;
442 param->ddp_max_nlink = LDISKFS_LINK_MAX;
443 param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
450 static int osd_param_is_sane(const struct osd_device *dev,
451 const struct txn_param *param)
453 return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
456 static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
458 struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
459 struct thandle *th = &oh->ot_super;
460 struct dt_device *dev = th->th_dev;
462 LASSERT(dev != NULL);
465 CERROR("transaction @0x%p commit error: %d\n", th, error);
467 /* This dd_ctx_for_commit is only for commit usage.
468 * see "struct dt_device"
470 dt_txn_hook_commit(&osd_dt_dev(dev)->od_ctx_for_commit, th);
473 lu_device_put(&dev->dd_lu_dev);
476 lu_context_exit(&th->th_ctx);
477 lu_context_fini(&th->th_ctx);
481 static struct thandle *osd_trans_start(const struct lu_context *ctx,
485 struct osd_device *dev = osd_dt_dev(d);
487 struct osd_thandle *oh;
489 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
494 hook_res = dt_txn_hook_start(ctx, d, p);
496 RETURN(ERR_PTR(hook_res));
498 if (osd_param_is_sane(dev, p)) {
499 OBD_ALLOC_GFP(oh, sizeof *oh, GFP_NOFS);
502 * XXX temporary stuff. Some abstraction layer should
506 jh = journal_start(osd_journal(dev), p->tp_credits);
511 lu_device_get(&d->dd_lu_dev);
512 /* add commit callback */
513 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
514 lu_context_enter(&th->th_ctx);
515 journal_callback_set(jh, osd_trans_commit_cb,
516 (struct journal_callback *)&oh->ot_jcb);
517 LASSERT(oti->oti_txns == 0);
518 LASSERT(oti->oti_r_locks == 0);
519 LASSERT(oti->oti_w_locks == 0);
526 th = ERR_PTR(-ENOMEM);
528 CERROR("Invalid transaction parameters\n");
529 th = ERR_PTR(-EINVAL);
535 static void osd_trans_stop(const struct lu_context *ctx, struct thandle *th)
538 struct osd_thandle *oh;
539 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
543 oh = container_of0(th, struct osd_thandle, ot_super);
544 if (oh->ot_handle != NULL) {
545 handle_t *hdl = oh->ot_handle;
547 * XXX temporary stuff. Some abstraction layer should be used.
549 result = dt_txn_hook_stop(ctx, th);
551 CERROR("Failure in transaction hook: %d\n", result);
554 oh->ot_handle = NULL;
555 result = journal_stop(hdl);
557 CERROR("Failure to stop transaction: %d\n", result);
559 LASSERT(oti->oti_txns == 1);
560 LASSERT(oti->oti_r_locks == 0);
561 LASSERT(oti->oti_w_locks == 0);
567 static void osd_sync(const struct lu_context *ctx,
570 struct osd_device *osd = osd_dt_dev(d);
573 CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD0_NAME);
574 ldiskfs_force_commit(osd_sb(osd));
578 static void osd_ro(const struct lu_context *ctx,
579 struct dt_device *d, int sync)
582 struct txn_param param = {
587 CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD0_NAME);
589 th = osd_trans_start(ctx, d, ¶m);
591 osd_trans_stop(ctx, th);
596 lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))));
601 static struct dt_device_operations osd_dt_ops = {
602 .dt_root_get = osd_root_get,
603 .dt_statfs = osd_statfs,
604 .dt_trans_start = osd_trans_start,
605 .dt_trans_stop = osd_trans_stop,
606 .dt_conf_get = osd_conf_get,
611 static void osd_object_read_lock(const struct lu_context *ctx,
612 struct dt_object *dt)
614 struct osd_object *obj = osd_dt_obj(dt);
615 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
617 LASSERT(osd_invariant(obj));
619 LASSERT(obj->oo_owner != ctx);
620 down_read(&obj->oo_sem);
621 LASSERT(obj->oo_owner == NULL);
625 static void osd_object_write_lock(const struct lu_context *ctx,
626 struct dt_object *dt)
628 struct osd_object *obj = osd_dt_obj(dt);
629 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
631 LASSERT(osd_invariant(obj));
633 LASSERT(obj->oo_owner != ctx);
634 down_write(&obj->oo_sem);
635 LASSERT(obj->oo_owner == NULL);
637 * Write lock assumes transaction.
639 LASSERT(oti->oti_txns > 0);
644 static void osd_object_read_unlock(const struct lu_context *ctx,
645 struct dt_object *dt)
647 struct osd_object *obj = osd_dt_obj(dt);
648 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
650 LASSERT(osd_invariant(obj));
651 LASSERT(oti->oti_r_locks > 0);
653 up_read(&obj->oo_sem);
656 static void osd_object_write_unlock(const struct lu_context *ctx,
657 struct dt_object *dt)
659 struct osd_object *obj = osd_dt_obj(dt);
660 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
662 LASSERT(osd_invariant(obj));
663 LASSERT(obj->oo_owner == ctx);
664 LASSERT(oti->oti_w_locks > 0);
666 obj->oo_owner = NULL;
667 up_write(&obj->oo_sem);
670 static int osd_attr_get(const struct lu_context *ctxt, struct dt_object *dt,
671 struct lu_attr *attr)
673 struct osd_object *obj = osd_dt_obj(dt);
674 LASSERT(dt_object_exists(dt));
675 LASSERT(osd_invariant(obj));
676 LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
678 return osd_inode_getattr(ctxt, obj->oo_inode, attr);
681 static int osd_attr_set(const struct lu_context *ctxt,
682 struct dt_object *dt,
683 const struct lu_attr *attr,
684 struct thandle *handle)
686 struct osd_object *obj = osd_dt_obj(dt);
687 LASSERT(dt_object_exists(dt));
688 LASSERT(osd_invariant(obj));
689 LASSERT(osd_write_locked(ctxt, obj));
691 return osd_inode_setattr(ctxt, obj->oo_inode, attr);
694 static struct timespec *osd_inode_time(const struct lu_context *ctx,
695 struct inode *inode, __u64 seconds)
697 struct osd_thread_info *oti = lu_context_key_get(ctx, &osd_key);
698 struct timespec *t = &oti->oti_time;
702 *t = timespec_trunc(*t, get_sb_time_gran(inode->i_sb));
706 static int osd_inode_setattr(const struct lu_context *ctx,
707 struct inode *inode, const struct lu_attr *attr)
712 bits = attr->la_valid;
714 LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
717 inode->i_atime = *osd_inode_time(ctx, inode, attr->la_atime);
719 inode->i_ctime = *osd_inode_time(ctx, inode, attr->la_ctime);
721 inode->i_mtime = *osd_inode_time(ctx, inode, attr->la_mtime);
723 inode->i_size = attr->la_size;
724 if (bits & LA_BLOCKS)
725 inode->i_blocks = attr->la_blocks;
727 inode->i_mode = (inode->i_mode & S_IFMT) |
728 (attr->la_mode & ~S_IFMT);
730 inode->i_uid = attr->la_uid;
732 inode->i_gid = attr->la_gid;
734 inode->i_nlink = attr->la_nlink;
736 inode->i_rdev = attr->la_rdev;
737 if (bits & LA_BLKSIZE)
738 inode->i_blksize = attr->la_blksize;
740 if (bits & LA_FLAGS) {
742 * Horrible ext3 legacy. Flags are better to be handled in
745 struct ldiskfs_inode_info *li = LDISKFS_I(inode);
747 li->i_flags = (li->i_flags & ~LDISKFS_FL_USER_MODIFIABLE) |
748 (attr->la_flags & LDISKFS_FL_USER_MODIFIABLE);
750 mark_inode_dirty(inode);
757 * XXX temporary solution.
760 static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
761 struct lu_attr *attr, struct thandle *th)
766 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
767 struct lu_attr *attr, struct thandle *th)
769 LASSERT(obj->oo_inode != NULL);
771 osd_object_init0(obj);
775 static void osd_fid_build_name(const struct lu_fid *fid, char *name)
777 static const char *qfmt = LPX64":%lx:%lx";
779 sprintf(name, qfmt, fid_seq(fid), fid_oid(fid), fid_ver(fid));
782 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
783 umode_t mode, struct thandle *th)
786 struct osd_device *osd = osd_obj2dev(obj);
790 * XXX temporary solution.
792 struct dentry *dentry;
794 LASSERT(osd_invariant(obj));
795 LASSERT(obj->oo_inode == NULL);
796 LASSERT(osd->od_obj_area != NULL);
798 dir = osd->od_obj_area->d_inode;
799 LASSERT(dir->i_op != NULL && dir->i_op->create != NULL);
801 osd_fid_build_name(lu_object_fid(&obj->oo_dt.do_lu), info->oti_name);
802 info->oti_str.name = info->oti_name;
803 info->oti_str.len = strlen(info->oti_name);
805 dentry = d_alloc(osd->od_obj_area, &info->oti_str);
806 if (dentry != NULL) {
807 result = dir->i_op->create(dir, dentry, mode, NULL);
809 LASSERT(dentry->d_inode != NULL);
810 obj->oo_inode = dentry->d_inode;
811 igrab(obj->oo_inode);
816 LASSERT(osd_invariant(obj));
821 extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
822 int recsize, handle_t *handle);
828 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
829 struct lu_attr *attr, struct thandle *th)
832 struct osd_thandle *oth;
834 oth = container_of0(th, struct osd_thandle, ot_super);
835 LASSERT(S_ISDIR(attr->la_mode));
836 result = osd_mkfile(info, obj, (attr->la_mode &
837 (S_IFMT | S_IRWXUGO | S_ISVTX)), th);
839 LASSERT(obj->oo_inode != NULL);
841 * XXX uh-oh... call low-level iam function directly.
843 result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
844 sizeof (struct lu_fid),
850 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
851 struct lu_attr *attr, struct thandle *th)
853 LASSERT(S_ISREG(attr->la_mode));
854 return osd_mkfile(info, obj, (attr->la_mode &
855 (S_IFMT | S_IRWXUGO | S_ISVTX)), th);
858 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
859 struct lu_attr *attr, struct thandle *th)
861 LASSERT(S_ISLNK(attr->la_mode));
862 return osd_mkfile(info, obj, (attr->la_mode &
863 (S_IFMT | S_IRWXUGO | S_ISVTX)), th);
866 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
867 struct lu_attr *attr, struct thandle *th)
870 struct osd_device *osd = osd_obj2dev(obj);
872 umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
875 * XXX temporary solution.
877 struct dentry *dentry;
879 LASSERT(osd_invariant(obj));
880 LASSERT(obj->oo_inode == NULL);
881 LASSERT(osd->od_obj_area != NULL);
883 dir = osd->od_obj_area->d_inode;
884 LASSERT(dir->i_op != NULL && dir->i_op->create != NULL);
886 osd_fid_build_name(lu_object_fid(&obj->oo_dt.do_lu), info->oti_name);
887 info->oti_str.name = info->oti_name;
888 info->oti_str.len = strlen(info->oti_name);
890 dentry = d_alloc(osd->od_obj_area, &info->oti_str);
891 if (dentry != NULL) {
892 result = dir->i_op->mknod(dir, dentry, mode, attr->la_rdev);
894 LASSERT(dentry->d_inode != NULL);
895 obj->oo_inode = dentry->d_inode;
896 igrab(obj->oo_inode);
901 LASSERT(osd_invariant(obj));
905 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
906 struct lu_attr *, struct thandle *);
908 static osd_obj_type_f osd_create_type_f(__u32 mode)
910 osd_obj_type_f result;
935 static int osd_object_create(const struct lu_context *ctx, struct dt_object *dt,
936 struct lu_attr *attr, struct thandle *th)
938 const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
939 struct osd_object *obj = osd_dt_obj(dt);
940 struct osd_device *osd = osd_obj2dev(obj);
941 struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
946 LASSERT(osd_invariant(obj));
947 LASSERT(!dt_object_exists(dt));
948 LASSERT(osd_write_locked(ctx, obj));
951 * XXX missing: permission checks.
955 * XXX missing: sanity checks (valid ->la_mode, etc.)
959 * XXX missing: Quote handling.
962 result = osd_create_pre(info, obj, attr, th);
964 result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
967 result = osd_create_post(info, obj, attr, th);
970 struct osd_inode_id *id = &info->oti_id;
972 LASSERT(obj->oo_inode != NULL);
974 id->oii_ino = obj->oo_inode->i_ino;
975 id->oii_gen = obj->oo_inode->i_generation;
977 osd_oi_write_lock(&osd->od_oi);
978 result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
979 osd_oi_write_unlock(&osd->od_oi);
982 LASSERT(ergo(result == 0, dt_object_exists(dt)));
983 LASSERT(osd_invariant(obj));
987 static void osd_object_ref_add(const struct lu_context *ctxt,
988 struct dt_object *dt, struct thandle *th)
990 struct osd_object *obj = osd_dt_obj(dt);
991 struct inode *inode = obj->oo_inode;
993 LASSERT(osd_invariant(obj));
994 LASSERT(dt_object_exists(dt));
995 LASSERT(osd_write_locked(ctxt, obj));
997 if (inode->i_nlink < LDISKFS_LINK_MAX) {
999 mark_inode_dirty(inode);
1001 LU_OBJECT_DEBUG(D_ERROR, ctxt, &dt->do_lu,
1002 "Overflowed nlink\n");
1003 LASSERT(osd_invariant(obj));
1006 static void osd_object_ref_del(const struct lu_context *ctxt,
1007 struct dt_object *dt, struct thandle *th)
1009 struct osd_object *obj = osd_dt_obj(dt);
1010 struct inode *inode = obj->oo_inode;
1012 LASSERT(osd_invariant(obj));
1013 LASSERT(dt_object_exists(dt));
1014 LASSERT(osd_write_locked(ctxt, obj));
1016 if (inode->i_nlink > 0) {
1018 mark_inode_dirty(inode);
1020 LU_OBJECT_DEBUG(D_ERROR, ctxt, &dt->do_lu,
1021 "Underflowed nlink\n");
1022 LASSERT(osd_invariant(obj));
1025 static int osd_xattr_get(const struct lu_context *ctxt, struct dt_object *dt,
1026 void *buf, int size, const char *name)
1028 struct osd_object *obj = osd_dt_obj(dt);
1029 struct inode *inode = obj->oo_inode;
1030 struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
1031 struct dentry *dentry = &info->oti_dentry;
1033 LASSERT(dt_object_exists(dt));
1034 LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
1035 LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
1037 dentry->d_inode = inode;
1038 return inode->i_op->getxattr(dentry, name, buf, size);
1041 static int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt,
1042 const void *buf, int size, const char *name, int fl,
1043 struct thandle *handle)
1047 struct osd_object *obj = osd_dt_obj(dt);
1048 struct inode *inode = obj->oo_inode;
1049 struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
1050 struct dentry *dentry = &info->oti_dentry;
1052 LASSERT(dt_object_exists(dt));
1053 LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
1054 LASSERT(osd_write_locked(ctxt, obj));
1056 dentry->d_inode = inode;
1059 if (fl & LU_XATTR_REPLACE)
1060 fs_flags |= XATTR_REPLACE;
1062 if (fl & LU_XATTR_CREATE)
1063 fs_flags |= XATTR_CREATE;
1065 return inode->i_op->setxattr(dentry, name, buf, size, fs_flags);
1068 static int osd_xattr_list(const struct lu_context *ctxt, struct dt_object *dt,
1069 void *buf, int size)
1071 struct osd_object *obj = osd_dt_obj(dt);
1072 struct inode *inode = obj->oo_inode;
1073 struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
1074 struct dentry *dentry = &info->oti_dentry;
1076 LASSERT(dt_object_exists(dt));
1077 LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
1078 LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
1080 dentry->d_inode = inode;
1081 return inode->i_op->listxattr(dentry, buf, size);
1084 static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
1085 const char *name, struct thandle *handle)
1087 struct osd_object *obj = osd_dt_obj(dt);
1088 struct inode *inode = obj->oo_inode;
1089 struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
1090 struct dentry *dentry = &info->oti_dentry;
1092 LASSERT(dt_object_exists(dt));
1093 LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
1094 LASSERT(osd_write_locked(ctxt, obj));
1096 dentry->d_inode = inode;
1097 return inode->i_op->removexattr(dentry, name);
1100 static int osd_dir_page_build(const struct lu_context *ctx, int first,
1101 void *area, int nob,
1102 struct dt_it_ops *iops, struct dt_it *it,
1103 __u32 *start, __u32 *end, __u32 hash_end,
1104 struct lu_dirent **last)
1107 struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1108 struct lu_fid *fid = &info->oti_fid;
1109 struct lu_dirent *ent;
1112 area += sizeof (struct lu_dirpage);
1113 nob -= sizeof (struct lu_dirpage);
1117 LASSERT(nob > sizeof *ent);
1127 name = (char *)iops->key(ctx, it);
1128 len = iops->key_size(ctx, it);
1130 *fid = *(struct lu_fid *)iops->rec(ctx, it);
1133 recsize = (sizeof *ent + len + 3) & ~3;
1134 hash = iops->store(ctx, it);
1135 if (hash > hash_end) {
1137 if (first && ent == area)
1142 CDEBUG(D_INODE, "%p %p %d "DFID": %#8.8x (%d)\"%*.*s\"\n",
1143 area, ent, nob, PFID(fid), hash, len, len, len, name);
1144 if (nob >= recsize) {
1145 ent->lde_fid = *fid;
1146 ent->lde_hash = hash;
1147 ent->lde_namelen = cpu_to_le16(len);
1148 ent->lde_reclen = cpu_to_le16(recsize);
1149 memcpy(ent->lde_name, name, len);
1150 if (first && ent == area)
1153 ent = (void *)ent + recsize;
1155 result = iops->next(ctx, it);
1158 * record doesn't fit into page, enlarge previous one.
1160 LASSERT(*last != NULL);
1161 (*last)->lde_reclen =
1162 cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
1166 } while (result == 0);
1170 static int osd_readpage(const struct lu_context *ctxt,
1171 struct dt_object *dt, const struct lu_rdpg *rdpg)
1174 struct osd_object *obj = osd_dt_obj(dt);
1175 struct dt_it_ops *iops;
1180 LASSERT(dt_object_exists(dt));
1181 LASSERT(osd_invariant(obj));
1182 LASSERT(osd_has_index(obj));
1183 LASSERT(osd_read_locked(ctxt, obj) || osd_write_locked(ctxt, obj));
1185 LASSERT(rdpg->rp_pages != NULL);
1187 if (rdpg->rp_count <= 0)
1190 if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) {
1191 CERROR("size %u is not multiple of blocksize %lu\n",
1192 rdpg->rp_count, obj->oo_inode->i_blksize);
1197 * iterating through directory and fill pages from @rdpg
1199 iops = &dt->do_index_ops->dio_it;
1200 it = iops->init(ctxt, dt);
1204 * XXX position iterator at rdpg->rp_hash
1206 rc = iops->load(ctxt, it, rdpg->rp_hash);
1208 struct page *pg; /* no, Richard, it _is_ initialized */
1209 struct lu_dirent *last;
1213 for (i = 0, rc = 0, nob = rdpg->rp_count;
1214 rc == 0 && nob > 0; i++, nob -= CFS_PAGE_SIZE) {
1215 LASSERT(i < rdpg->rp_npages);
1216 pg = rdpg->rp_pages[i];
1217 rc = osd_dir_page_build(ctxt, !i, kmap(pg),
1218 min_t(int, nob, CFS_PAGE_SIZE),
1220 &hash_start, &hash_end,
1221 rdpg->rp_hash_end, &last);
1222 if (rc != 0 || i == rdpg->rp_npages - 1)
1223 last->lde_reclen = 0;
1226 iops->put(ctxt, it);
1235 struct lu_dirpage *dp;
1237 dp = kmap(rdpg->rp_pages[0]);
1238 dp->ldp_hash_start = hash_start;
1239 dp->ldp_hash_end = hash_end;
1240 kunmap(rdpg->rp_pages[0]);
1244 iops->put(ctxt, it);
1245 iops->fini(ctxt, it);
1250 static struct dt_object_operations osd_obj_ops = {
1251 .do_read_lock = osd_object_read_lock,
1252 .do_write_lock = osd_object_write_lock,
1253 .do_read_unlock = osd_object_read_unlock,
1254 .do_write_unlock = osd_object_write_unlock,
1255 .do_attr_get = osd_attr_get,
1256 .do_attr_set = osd_attr_set,
1257 .do_create = osd_object_create,
1258 .do_index_try = osd_index_try,
1259 .do_ref_add = osd_object_ref_add,
1260 .do_ref_del = osd_object_ref_del,
1261 .do_xattr_get = osd_xattr_get,
1262 .do_xattr_set = osd_xattr_set,
1263 .do_xattr_del = osd_xattr_del,
1264 .do_xattr_list = osd_xattr_list,
1265 .do_readpage = osd_readpage,
1272 static ssize_t osd_read(const struct lu_context *ctxt, struct dt_object *dt,
1273 void *buf, size_t count, loff_t *pos)
1275 struct inode *inode = osd_dt_obj(dt)->oo_inode;
1280 file = osd_rw_init(ctxt, inode, &seg);
1282 * We'd like to use vfs_read() here, but it messes with
1283 * dnotify_parent() and locks.
1285 result = file->f_op->read(file, buf, count, pos);
1290 static ssize_t osd_write(const struct lu_context *ctxt, struct dt_object *dt,
1291 const void *buf, size_t count, loff_t *pos,
1292 struct thandle *handle)
1294 struct inode *inode = osd_dt_obj(dt)->oo_inode;
1299 file = osd_rw_init(ctxt, inode, &seg);
1300 result = file->f_op->write(file, buf, count, pos);
1305 static struct dt_body_operations osd_body_ops = {
1306 .dbo_read = osd_read,
1307 .dbo_write = osd_write
1314 static int osd_index_probe(const struct lu_context *ctxt, struct osd_object *o,
1315 const struct dt_index_features *feat)
1317 struct iam_descr *descr;
1319 descr = o->oo_container.ic_descr;
1320 if (feat == &dt_directory_features)
1321 return osd_sb(osd_obj2dev(o))->s_root->d_inode == o->oo_inode ||
1322 descr == &iam_htree_compat_param ||
1323 (descr->id_rec_size == sizeof(struct lu_fid) &&
1325 * XXX check that index looks like directory.
1331 feat->dif_keysize_min <= descr->id_key_size &&
1332 descr->id_key_size <= feat->dif_keysize_max &&
1333 feat->dif_recsize_min <= descr->id_rec_size &&
1334 descr->id_rec_size <= feat->dif_recsize_max &&
1335 !(feat->dif_flags & (DT_IND_VARKEY |
1336 DT_IND_VARREC | DT_IND_NONUNQ)) &&
1337 ergo(feat->dif_flags & DT_IND_UPDATE,
1338 1 /* XXX check that object (and file system) is
1342 static int osd_index_try(const struct lu_context *ctx, struct dt_object *dt,
1343 const struct dt_index_features *feat)
1346 struct osd_object *obj = osd_dt_obj(dt);
1348 LASSERT(osd_invariant(obj));
1349 LASSERT(dt_object_exists(dt));
1351 if (osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode) {
1352 dt->do_index_ops = &osd_index_compat_ops;
1354 } else if (!osd_has_index(obj)) {
1355 struct iam_container *bag;
1357 bag = &obj->oo_container;
1358 result = iam_container_init(bag, &obj->oo_descr, obj->oo_inode);
1360 result = iam_container_setup(bag);
1362 struct iam_path_descr *ipd;
1364 LASSERT(obj->oo_ipd == NULL);
1365 ipd = bag->ic_descr->id_ops->id_ipd_alloc(bag);
1368 dt->do_index_ops = &osd_index_ops;
1377 if (osd_index_probe(ctx, obj, feat))
1382 LASSERT(osd_invariant(obj));
1387 static int osd_index_delete(const struct lu_context *ctxt, struct dt_object *dt,
1388 const struct dt_key *key, struct thandle *handle)
1390 struct osd_object *obj = osd_dt_obj(dt);
1391 struct osd_thandle *oh;
1396 LASSERT(osd_invariant(obj));
1397 LASSERT(dt_object_exists(dt));
1398 LASSERT(obj->oo_container.ic_object == obj->oo_inode);
1399 LASSERT(obj->oo_ipd != NULL);
1401 oh = container_of0(handle, struct osd_thandle, ot_super);
1402 LASSERT(oh->ot_handle != NULL);
1404 rc = iam_delete(oh->ot_handle, &obj->oo_container,
1405 (const struct iam_key *)key, obj->oo_ipd);
1407 LASSERT(osd_invariant(obj));
1411 static int osd_index_lookup(const struct lu_context *ctxt, struct dt_object *dt,
1412 struct dt_rec *rec, const struct dt_key *key)
1414 struct osd_object *obj = osd_dt_obj(dt);
1419 LASSERT(osd_invariant(obj));
1420 LASSERT(dt_object_exists(dt));
1421 LASSERT(obj->oo_container.ic_object == obj->oo_inode);
1422 LASSERT(obj->oo_ipd != NULL);
1424 rc = iam_lookup(&obj->oo_container, (const struct iam_key *)key,
1425 (struct iam_rec *)rec, obj->oo_ipd);
1427 LASSERT(osd_invariant(obj));
1433 static int osd_index_insert(const struct lu_context *ctx, struct dt_object *dt,
1434 const struct dt_rec *rec, const struct dt_key *key,
1437 struct osd_object *obj = osd_dt_obj(dt);
1439 struct osd_thandle *oh;
1444 LASSERT(osd_invariant(obj));
1445 LASSERT(dt_object_exists(dt));
1446 LASSERT(obj->oo_container.ic_object == obj->oo_inode);
1447 LASSERT(obj->oo_ipd != NULL);
1449 oh = container_of0(th, struct osd_thandle, ot_super);
1450 LASSERT(oh->ot_handle != NULL);
1451 rc = iam_insert(oh->ot_handle, &obj->oo_container,
1452 (const struct iam_key *)key,
1453 (struct iam_rec *)rec, obj->oo_ipd);
1455 LASSERT(osd_invariant(obj));
1460 * Iterator operations.
1463 struct osd_object *oi_obj;
1464 struct iam_iterator oi_it;
1467 static struct dt_it *osd_it_init(const struct lu_context *ctx,
1468 struct dt_object *dt)
1471 struct osd_object *obj = osd_dt_obj(dt);
1472 struct lu_object *lo = &dt->do_lu;
1474 LASSERT(lu_object_exists(lo));
1475 LASSERT(obj->oo_ipd != NULL);
1481 iam_it_init(&it->oi_it,
1482 &obj->oo_container, IAM_IT_MOVE, obj->oo_ipd);
1484 return (struct dt_it *)it;
1487 static void osd_it_fini(const struct lu_context *ctx, struct dt_it *di)
1489 struct osd_it *it = (struct osd_it *)di;
1491 iam_it_fini(&it->oi_it);
1492 lu_object_put(ctx, &it->oi_obj->oo_dt.do_lu);
1496 static int osd_it_get(const struct lu_context *ctx,
1497 struct dt_it *di, const struct dt_key *key)
1499 struct osd_it *it = (struct osd_it *)di;
1501 return iam_it_get(&it->oi_it, (const struct iam_key *)key);
1504 static void osd_it_put(const struct lu_context *ctx, struct dt_it *di)
1506 struct osd_it *it = (struct osd_it *)di;
1507 iam_it_put(&it->oi_it);
1510 static int osd_it_next(const struct lu_context *ctx, struct dt_it *di)
1512 struct osd_it *it = (struct osd_it *)di;
1513 return iam_it_next(&it->oi_it);
1516 static struct dt_key *osd_it_key(const struct lu_context *ctx,
1517 const struct dt_it *di)
1519 struct osd_it *it = (struct osd_it *)di;
1520 return (struct dt_key *)iam_it_key_get(&it->oi_it);
1523 static int osd_it_key_size(const struct lu_context *ctx, const struct dt_it *di)
1525 struct osd_it *it = (struct osd_it *)di;
1526 return iam_it_key_size(&it->oi_it);
1529 static struct dt_rec *osd_it_rec(const struct lu_context *ctx,
1530 const struct dt_it *di)
1532 struct osd_it *it = (struct osd_it *)di;
1533 return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
1536 static __u32 osd_it_store(const struct lu_context *ctxt, const struct dt_it *di)
1538 struct osd_it *it = (struct osd_it *)di;
1539 return iam_it_store(&it->oi_it);
1542 static int osd_it_load(const struct lu_context *ctxt,
1543 const struct dt_it *di, __u32 hash)
1545 struct osd_it *it = (struct osd_it *)di;
1546 return iam_it_load(&it->oi_it, hash);
1549 static struct dt_index_operations osd_index_ops = {
1550 .dio_lookup = osd_index_lookup,
1551 .dio_insert = osd_index_insert,
1552 .dio_delete = osd_index_delete,
1554 .init = osd_it_init,
1555 .fini = osd_it_fini,
1558 .next = osd_it_next,
1560 .key_size = osd_it_key_size,
1562 .store = osd_it_store,
1567 static int osd_index_compat_delete(const struct lu_context *ctxt,
1568 struct dt_object *dt,
1569 const struct dt_key *key,
1570 struct thandle *handle)
1572 struct osd_object *obj = osd_dt_obj(dt);
1574 LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1576 RETURN(-EOPNOTSUPP);
1580 * Compatibility index operations.
1584 static int osd_build_fid(struct osd_device *osd,
1585 struct dentry *dentry, struct lu_fid *fid)
1587 struct inode *inode = dentry->d_inode;
1589 lu_igif_build(fid, inode->i_ino, inode->i_generation);
1593 static int osd_index_compat_lookup(const struct lu_context *ctxt,
1594 struct dt_object *dt,
1595 struct dt_rec *rec, const struct dt_key *key)
1597 struct osd_object *obj = osd_dt_obj(dt);
1599 struct osd_device *osd = osd_obj2dev(obj);
1600 struct osd_thread_info *info = lu_context_key_get(ctxt, &osd_key);
1606 * XXX temporary solution.
1608 struct dentry *dentry;
1609 struct dentry *parent;
1611 LASSERT(osd_invariant(obj));
1612 LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1613 LASSERT(osd_has_index(obj));
1615 info->oti_str.name = (const char *)key;
1616 info->oti_str.len = strlen((const char *)key);
1618 dir = obj->oo_inode;
1619 LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
1621 parent = d_alloc_root(dir);
1625 dentry = d_alloc(parent, &info->oti_str);
1626 if (dentry != NULL) {
1630 * XXX passing NULL for nameidata should work for
1633 d = dir->i_op->lookup(dir, dentry, NULL);
1636 * normal case, result is in @dentry.
1638 if (dentry->d_inode != NULL)
1639 result = osd_build_fid(osd, dentry,
1640 (struct lu_fid *)rec);
1644 /* What? Disconnected alias? Ppheeeww... */
1645 CERROR("Aliasing where not expected\n");
1653 LASSERT(osd_invariant(obj));
1657 static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
1658 struct inode *dir, struct inode *inode, const char *name)
1662 struct dentry *parent;
1666 info->oti_str.name = name;
1667 info->oti_str.len = strlen(name);
1669 LASSERT(atomic_read(&dir->i_count) > 0);
1671 old = d_alloc(dev->od_obj_area, &info->oti_str);
1673 d_instantiate(old, inode);
1675 LASSERT(atomic_read(&dir->i_count) > 0);
1676 parent = d_alloc_root(dir);
1677 if (parent != NULL) {
1679 LASSERT(atomic_read(&dir->i_count) > 1);
1680 new = d_alloc(parent, &info->oti_str);
1681 LASSERT(atomic_read(&dir->i_count) > 1);
1683 LASSERT(atomic_read(&dir->i_count) > 1);
1684 result = dir->i_op->link(old, dir, new);
1685 LASSERT(atomic_read(&dir->i_count) > 1);
1687 LASSERT(atomic_read(&dir->i_count) > 1);
1689 LASSERT(atomic_read(&dir->i_count) > 1);
1691 LASSERT(atomic_read(&dir->i_count) > 0);
1695 LASSERT(atomic_read(&dir->i_count) > 0);
1701 * XXX Temporary stuff.
1703 static int osd_index_compat_insert(const struct lu_context *ctx,
1704 struct dt_object *dt,
1705 const struct dt_rec *rec,
1706 const struct dt_key *key, struct thandle *th)
1708 struct osd_object *obj = osd_dt_obj(dt);
1710 const struct lu_fid *fid = (const struct lu_fid *)rec;
1711 const char *name = (const char *)key;
1713 struct lu_device *ludev = dt->do_lu.lo_dev;
1714 struct lu_object *luch;
1716 struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1720 LASSERT(S_ISDIR(obj->oo_inode->i_mode));
1721 LASSERT(osd_invariant(obj));
1723 luch = lu_object_find(ctx, ludev->ld_site, fid);
1724 if (!IS_ERR(luch)) {
1725 if (lu_object_exists(luch)) {
1726 struct osd_object *child;
1728 child = osd_obj(lu_object_locate(luch->lo_header,
1731 result = osd_add_rec(info, osd_obj2dev(obj),
1733 child->oo_inode, name);
1735 CERROR("No osd slice.\n");
1738 LASSERT(osd_invariant(obj));
1739 LASSERT(osd_invariant(child));
1744 lu_object_put(ctx, luch);
1746 result = PTR_ERR(luch);
1747 LASSERT(osd_invariant(obj));
1751 static struct dt_index_operations osd_index_compat_ops = {
1752 .dio_lookup = osd_index_compat_lookup,
1753 .dio_insert = osd_index_compat_insert,
1754 .dio_delete = osd_index_compat_delete
1758 * OSD device type methods
1760 static int osd_type_init(struct lu_device_type *t)
1762 return lu_context_key_register(&osd_key);
1765 static void osd_type_fini(struct lu_device_type *t)
1767 lu_context_key_degister(&osd_key);
1770 static struct lu_context_key osd_key = {
1771 .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
1772 .lct_init = osd_key_init,
1773 .lct_fini = osd_key_fini,
1774 .lct_exit = osd_key_exit
1777 static void *osd_key_init(const struct lu_context *ctx,
1778 struct lu_context_key *key)
1780 struct osd_thread_info *info;
1782 OBD_ALLOC_PTR(info);
1784 info->oti_ctx = ctx;
1786 info = ERR_PTR(-ENOMEM);
1790 static void osd_key_fini(const struct lu_context *ctx,
1791 struct lu_context_key *key, void *data)
1793 struct osd_thread_info *info = data;
1797 static void osd_key_exit(const struct lu_context *ctx,
1798 struct lu_context_key *key, void *data)
1800 struct osd_thread_info *info = data;
1802 LASSERT(info->oti_r_locks == 0);
1803 LASSERT(info->oti_w_locks == 0);
1804 LASSERT(info->oti_txns == 0);
1807 static int osd_device_init(const struct lu_context *ctx,
1808 struct lu_device *d, struct lu_device *next)
1811 rc = lu_context_init(&osd_dev(d)->od_ctx_for_commit, LCT_MD_THREAD);
1813 lu_context_enter(&osd_dev(d)->od_ctx_for_commit);
1817 static int osd_shutdown(const struct lu_context *ctx, struct osd_device *o)
1819 struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1821 if (o->od_obj_area != NULL) {
1822 dput(o->od_obj_area);
1823 o->od_obj_area = NULL;
1825 osd_oi_fini(info, &o->od_oi);
1830 static int osd_mount(const struct lu_context *ctx,
1831 struct osd_device *o, struct lustre_cfg *cfg)
1833 struct lustre_mount_info *lmi;
1834 const char *dev = lustre_cfg_string(cfg, 0);
1835 struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
1840 if (o->od_mount != NULL) {
1841 CERROR("Already mounted (%s)\n", dev);
1846 lmi = server_get_mount(dev);
1848 CERROR("Cannot get mount info for %s!\n", dev);
1852 LASSERT(lmi != NULL);
1853 /* save lustre_mount_info in dt_device */
1856 result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
1860 d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1);
1864 result = PTR_ERR(d);
1867 osd_shutdown(ctx, o);
1871 static struct lu_device *osd_device_fini(const struct lu_context *ctx,
1872 struct lu_device *d)
1876 shrink_dcache_sb(osd_sb(osd_dev(d)));
1878 if (osd_dev(d)->od_mount)
1879 server_put_mount(osd_dev(d)->od_mount->lmi_name,
1880 osd_dev(d)->od_mount->lmi_mnt);
1881 osd_dev(d)->od_mount = NULL;
1883 lu_context_exit(&osd_dev(d)->od_ctx_for_commit);
1884 lu_context_fini(&osd_dev(d)->od_ctx_for_commit);
1888 static struct lu_device *osd_device_alloc(const struct lu_context *ctx,
1889 struct lu_device_type *t,
1890 struct lustre_cfg *cfg)
1892 struct lu_device *l;
1893 struct osd_device *o;
1899 result = dt_device_init(&o->od_dt_dev, t);
1902 l->ld_ops = &osd_lu_ops;
1903 o->od_dt_dev.dd_ops = &osd_dt_ops;
1905 l = ERR_PTR(result);
1907 l = ERR_PTR(-ENOMEM);
1911 static void osd_device_free(const struct lu_context *ctx, struct lu_device *d)
1913 struct osd_device *o = osd_dev(d);
1915 dt_device_fini(&o->od_dt_dev);
1919 static int osd_process_config(const struct lu_context *ctx,
1920 struct lu_device *d, struct lustre_cfg *cfg)
1922 struct osd_device *o = osd_dev(d);
1925 switch(cfg->lcfg_command) {
1927 err = osd_mount(ctx, o, cfg);
1930 err = osd_shutdown(ctx, o);
1939 static int osd_recovery_complete(const struct lu_context *ctxt,
1940 struct lu_device *d)
1943 /* TODO: orphans handling */
1948 * fid<->inode<->object functions.
1951 static struct inode *osd_open(struct dentry *parent,
1952 const char *name, mode_t mode)
1954 struct dentry *dentry;
1955 struct inode *result;
1957 dentry = osd_lookup(parent, name);
1958 if (IS_ERR(dentry)) {
1959 CERROR("Error opening %s: %ld\n", name, PTR_ERR(dentry));
1960 result = NULL; /* dput(NULL) below is OK */
1961 } else if (dentry->d_inode == NULL) {
1962 CERROR("Not found: %s\n", name);
1963 result = ERR_PTR(-ENOENT);
1964 } else if ((dentry->d_inode->i_mode & S_IFMT) != mode) {
1965 CERROR("Wrong mode: %s: %o != %o\n", name,
1966 dentry->d_inode->i_mode, mode);
1967 result = ERR_PTR(mode == S_IFDIR ? -ENOTDIR : -EISDIR);
1969 result = dentry->d_inode;
1976 struct dentry *osd_lookup(struct dentry *parent, const char *name)
1978 struct dentry *dentry;
1980 CDEBUG(D_INODE, "looking up object %s\n", name);
1981 down(&parent->d_inode->i_sem);
1982 dentry = lookup_one_len(name, parent, strlen(name));
1983 up(&parent->d_inode->i_sem);
1985 if (IS_ERR(dentry)) {
1986 CERROR("error getting %s: %ld\n", name, PTR_ERR(dentry));
1987 } else if (dentry->d_inode != NULL && is_bad_inode(dentry->d_inode)) {
1988 CERROR("got bad object %s inode %lu\n",
1989 name, dentry->d_inode->i_ino);
1991 dentry = ERR_PTR(-ENOENT);
1996 int osd_lookup_id(struct dt_device *dev, const char *name, mode_t mode,
1997 struct osd_inode_id *id)
1999 struct inode *inode;
2000 struct osd_device *osd = osd_dt_dev(dev);
2003 inode = osd_open(osd_sb(osd)->s_root, name, mode);
2004 if (!IS_ERR(inode)) {
2005 LASSERT(inode != NULL);
2006 id->oii_ino = inode->i_ino;
2007 id->oii_gen = inode->i_generation;
2010 result = PTR_ERR(inode);
2014 static struct inode *osd_iget(struct osd_thread_info *info,
2015 struct osd_device *dev,
2016 const struct osd_inode_id *id)
2018 struct inode *inode;
2020 inode = iget(osd_sb(dev), id->oii_ino);
2021 if (inode == NULL) {
2022 CERROR("no inode\n");
2023 inode = ERR_PTR(-EACCES);
2024 } else if (is_bad_inode(inode)) {
2025 CERROR("bad inode\n");
2027 inode = ERR_PTR(-ENOENT);
2028 } else if (inode->i_generation != id->oii_gen) {
2029 CERROR("stale inode\n");
2031 inode = ERR_PTR(-ESTALE);
2038 static int osd_fid_lookup(const struct lu_context *ctx,
2039 struct osd_object *obj, const struct lu_fid *fid)
2041 struct osd_thread_info *info;
2042 struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
2043 struct osd_device *dev;
2044 struct osd_inode_id *id;
2046 struct inode *inode;
2049 LASSERT(osd_invariant(obj));
2050 LASSERT(obj->oo_inode == NULL);
2051 LASSERT(fid_is_sane(fid));
2052 LASSERT(fid_is_local(ldev->ld_site, fid));
2056 info = lu_context_key_get(ctx, &osd_key);
2057 dev = osd_dev(ldev);
2061 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
2064 osd_oi_read_lock(oi);
2065 result = osd_oi_lookup(info, oi, fid, id);
2067 inode = osd_iget(info, dev, id);
2068 if (!IS_ERR(inode)) {
2069 obj->oo_inode = inode;
2070 LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
2074 * If fid wasn't found in oi, inode-less object is
2075 * created, for which lu_object_exists() returns
2076 * false. This is used in a (frequent) case when
2077 * objects are created as locking anchors or
2078 * place holders for objects yet to be created.
2080 result = PTR_ERR(inode);
2081 } else if (result == -ENOENT)
2083 osd_oi_read_unlock(oi);
2084 LASSERT(osd_invariant(obj));
2088 static int osd_inode_getattr(const struct lu_context *ctx,
2089 struct inode *inode, struct lu_attr *attr)
2091 attr->la_valid |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2092 LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2093 LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE;
2095 attr->la_atime = LTIME_S(inode->i_atime);
2096 attr->la_mtime = LTIME_S(inode->i_mtime);
2097 attr->la_ctime = LTIME_S(inode->i_ctime);
2098 attr->la_mode = inode->i_mode;
2099 attr->la_size = inode->i_size;
2100 attr->la_blocks = inode->i_blocks;
2101 attr->la_uid = inode->i_uid;
2102 attr->la_gid = inode->i_gid;
2103 attr->la_flags = LDISKFS_I(inode)->i_flags;
2104 attr->la_nlink = inode->i_nlink;
2105 attr->la_rdev = inode->i_rdev;
2106 attr->la_blksize = inode->i_blksize;
2114 static int lu_device_is_osd(const struct lu_device *d)
2116 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &osd_lu_ops);
2119 static struct osd_object *osd_obj(const struct lu_object *o)
2121 LASSERT(lu_device_is_osd(o->lo_dev));
2122 return container_of0(o, struct osd_object, oo_dt.do_lu);
2125 static struct osd_device *osd_dt_dev(const struct dt_device *d)
2127 LASSERT(lu_device_is_osd(&d->dd_lu_dev));
2128 return container_of0(d, struct osd_device, od_dt_dev);
2131 static struct osd_device *osd_dev(const struct lu_device *d)
2133 LASSERT(lu_device_is_osd(d));
2134 return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
2137 static struct osd_object *osd_dt_obj(const struct dt_object *d)
2139 return osd_obj(&d->do_lu);
2142 static struct osd_device *osd_obj2dev(const struct osd_object *o)
2144 return osd_dev(o->oo_dt.do_lu.lo_dev);
2147 static struct lu_device *osd2lu_dev(struct osd_device *osd)
2149 return &osd->od_dt_dev.dd_lu_dev;
2152 static struct super_block *osd_sb(const struct osd_device *dev)
2154 return dev->od_mount->lmi_mnt->mnt_sb;
2157 static journal_t *osd_journal(const struct osd_device *dev)
2159 return LDISKFS_SB(osd_sb(dev))->s_journal;
2162 static int osd_has_index(const struct osd_object *obj)
2164 return obj->oo_dt.do_index_ops != NULL;
2167 static int osd_object_invariant(const struct lu_object *l)
2169 return osd_invariant(osd_obj(l));
2172 static struct lu_object_operations osd_lu_obj_ops = {
2173 .loo_object_init = osd_object_init,
2174 .loo_object_delete = osd_object_delete,
2175 .loo_object_release = osd_object_release,
2176 .loo_object_free = osd_object_free,
2177 .loo_object_print = osd_object_print,
2178 .loo_object_invariant = osd_object_invariant
2181 static struct lu_device_operations osd_lu_ops = {
2182 .ldo_object_alloc = osd_object_alloc,
2183 .ldo_process_config = osd_process_config,
2184 .ldo_recovery_complete = osd_recovery_complete
2187 static struct lu_device_type_operations osd_device_type_ops = {
2188 .ldto_init = osd_type_init,
2189 .ldto_fini = osd_type_fini,
2191 .ldto_device_alloc = osd_device_alloc,
2192 .ldto_device_free = osd_device_free,
2194 .ldto_device_init = osd_device_init,
2195 .ldto_device_fini = osd_device_fini
2198 static struct lu_device_type osd_device_type = {
2199 .ldt_tags = LU_DEVICE_DT,
2200 .ldt_name = LUSTRE_OSD0_NAME,
2201 .ldt_ops = &osd_device_type_ops,
2202 .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2206 * lprocfs legacy support.
2208 static struct lprocfs_vars lprocfs_osd_obd_vars[] = {
2212 static struct lprocfs_vars lprocfs_osd_module_vars[] = {
2216 static struct obd_ops osd_obd_device_ops = {
2217 .o_owner = THIS_MODULE
2220 LPROCFS_INIT_VARS(osd, lprocfs_osd_module_vars, lprocfs_osd_obd_vars);
2222 static int __init osd_mod_init(void)
2224 struct lprocfs_static_vars lvars;
2226 lprocfs_init_vars(osd, &lvars);
2227 return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
2228 LUSTRE_OSD0_NAME, &osd_device_type);
2231 static void __exit osd_mod_exit(void)
2233 class_unregister_type(LUSTRE_OSD0_NAME);
2236 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2237 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD0_NAME")");
2238 MODULE_LICENSE("GPL");
2240 cfs_module(osd, "0.0.2", osd_mod_init, osd_mod_exit);