Whamcloud - gitweb
osd: 0. remove duplicate helper functions; 1. implement txn callbacks; 2. implement...
authornikita <nikita>
Mon, 8 May 2006 23:38:24 +0000 (23:38 +0000)
committernikita <nikita>
Mon, 8 May 2006 23:38:24 +0000 (23:38 +0000)
lustre/osd/osd_handler.c
lustre/osd/osd_internal.h

index ac69677..d8da399 100644 (file)
@@ -87,11 +87,16 @@ static int   osd_inode_getattr (struct lu_context *ctx,
                                 struct inode *inode, struct lu_attr *attr);
 static int   osd_inode_get_fid (struct osd_device *d, const struct inode *inode,
                                 struct lu_fid *fid);
+static int   osd_param_is_sane (const struct osd_device *dev,
+                                const struct txn_param *param);
 
 static struct osd_object  *osd_obj          (const struct lu_object *o);
 static struct osd_device  *osd_dev          (const struct lu_device *d);
 static struct osd_device  *osd_dt_dev       (const struct dt_device *d);
+static struct osd_object  *osd_dt_obj       (const struct dt_object *d);
+static struct osd_device  *osd_obj2dev      (struct osd_object *o);
 static struct lu_device   *osd_device_fini  (struct lu_device *d);
+static struct lu_device   *osd2lu_dev       (struct osd_device * osd);
 static struct lu_device   *osd_device_alloc (struct lu_device_type *t,
                                              struct lustre_cfg *cfg);
 static struct lu_object   *osd_object_alloc (struct lu_context *ctx,
@@ -148,6 +153,7 @@ static struct lu_object *osd_object_alloc(struct lu_context *ctx,
                 lu_object_init(l, NULL, d);
                 mo->oo_dt.do_ops = &osd_obj_ops;
                 l->lo_ops = &osd_lu_obj_ops;
+                init_rwsem(&mo->oo_sem);
                 return l;
         } else
                 return NULL;
@@ -229,7 +235,7 @@ static int osd_config(struct lu_context *ctx,
 static int osd_statfs(struct lu_context *ctx,
                       struct dt_device *d, struct kstatfs *sfs)
 {
-       struct osd_device *osd = dt2osd_dev(d);
+       struct osd_device *osd = osd_dt_dev(d);
         struct super_block *sb = osd_sb(osd);
         int result;
 
@@ -241,57 +247,74 @@ static int osd_statfs(struct lu_context *ctx,
         RETURN (result);
 }
 
+/*
+ * Journal
+ */
+
+static int osd_param_is_sane(const struct osd_device *dev,
+                             const struct txn_param *param)
+{
+        return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
+}
+
 static struct thandle *osd_trans_start(struct lu_context *ctx,
                                        struct dt_device *d,
                                        struct txn_param *p)
 {
         struct osd_device  *dev = osd_dt_dev(d);
-        handle_t           *handle;
+        handle_t           *jh;
         struct osd_thandle *oh;
-        struct thandle     *result;
+        struct thandle     *th;
+        int hook_res;
 
         ENTRY;
 
-        OBD_ALLOC_PTR(oh);
-        if (oh != NULL) {
-                /*
-                 * XXX temporary stuff. Some abstraction layer should be used.
-                 */
-                /*
-                 * XXX Here, run transaction start hook, so that modules can
-                 * change @p.
-                 */
-                handle = journal_start(osd_journal(dev), p->tp_credits);
-                if (!IS_ERR(handle)) {
-                        result = &oh->ot_super;
-                        result->th_dev = d;
-                        lu_device_get(&d->dd_lu_dev);
-                } else {
-                        OBD_FREE_PTR(oh);
-                        result = (void *)handle;
-                }
-        } else
-                result = ERR_PTR(-ENOMEM);
+        hook_res = dt_txn_hook_start(ctx, d, p);
+        if (hook_res != 0)
+                RETURN(ERR_PTR(hook_res));
+
+        if (osd_param_is_sane(dev, p)) {
+                OBD_ALLOC_PTR(oh);
+                if (oh != NULL) {
+                        /*
+                         * XXX temporary stuff. Some abstraction layer should
+                         * be used.
+                         */
+                        jh = journal_start(osd_journal(dev), p->tp_credits);
+                        if (!IS_ERR(jh)) {
+                                oh->ot_handle = jh;
+                                th = &oh->ot_super;
+                                th->th_dev = d;
+                                lu_device_get(&d->dd_lu_dev);
+                        } else {
+                                OBD_FREE_PTR(oh);
+                                th = (void *)jh;
+                        }
+                } else
+                        th = ERR_PTR(-ENOMEM);
+        } else {
+                CERROR("Invalid transaction parameters\n");
+                th = ERR_PTR(-EINVAL);
+        }
 
-        RETURN(result);
+        RETURN(th);
 }
 
 static void osd_trans_stop(struct lu_context *ctx, struct thandle *th)
 {
         int result;
         struct osd_thandle *oh;
-        struct thandle     *th;
 
         ENTRY;
+
         oh = container_of0(th, struct osd_thandle, ot_super);
-        th = &oh->ot_super;
         if (oh->ot_handle != NULL) {
                 /*
                  * XXX temporary stuff. Some abstraction layer should be used.
                  */
-                /*
-                 * XXX Here, run transaction stop hook.
-                 */
+                result = dt_txn_hook_stop(ctx, th->th_dev, th);
+                if (result != 0)
+                        CERROR("Failure in transaction hook: %d\n", result);
                 result = journal_stop(oh->ot_handle);
                 if (result != 0)
                         CERROR("Failure to stop transaction: %d\n", result);
@@ -313,42 +336,184 @@ static struct dt_device_operations osd_dt_ops = {
 };
 
 static void osd_object_lock(struct lu_context *ctx, struct dt_object *dt,
-                           enum dt_lock_mode mode)
+                            enum dt_lock_mode mode)
 {
-        return;
+        struct osd_object *obj = osd_dt_obj(dt);
+
+        LASSERT(mode == DT_WRITE_LOCK || mode == DT_READ_LOCK);
+        if (mode == DT_WRITE_LOCK)
+                down_write(&obj->oo_sem);
+        else
+                down_read(&obj->oo_sem);
 }
 
 static void osd_object_unlock(struct lu_context *ctx, struct dt_object *dt,
                              enum dt_lock_mode mode)
 {
-        return;
+        struct osd_object *obj = osd_dt_obj(dt);
+
+        LASSERT(mode == DT_WRITE_LOCK || mode == DT_READ_LOCK);
+        if (mode == DT_WRITE_LOCK)
+                up_write(&obj->oo_sem);
+        else
+                up_read(&obj->oo_sem);
 }
 
 static int osd_attr_get(struct lu_context *ctxt, struct dt_object *dt,
                         struct lu_attr *attr)
 {
         LASSERT(lu_object_exists(ctxt, &dt->do_lu));
-        return osd_inode_getattr(ctxt, dt2osd_obj(dt)->oo_inode, attr);
+        return osd_inode_getattr(ctxt, osd_dt_obj(dt)->oo_inode, attr);
 }
 
-static int osd_object_create(struct lu_context *ctxt, struct dt_object *dt,
-                             struct thandle *th)
+/*
+ * Object creation.
+ *
+ * XXX temporary solution.
+ */
+
+static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
+                          struct lu_attr *attr, struct thandle *th)
 {
-        struct lu_fid *lf = &dt->do_lu.lo_header->loh_fid;
-        struct osd_device *osd = osd_obj2dev(dt2osd_obj(dt));
-        if (lu_object_exists(ctxt, &dt->do_lu))
-                RETURN(-EEXIST);
-        else {
+        return 0;
+}
 
-                const struct osd_inode_id i_id = {
-                        .oii_ino = fid_seq(lf),
-                        .oii_gen = fid_oid(lf)
-                };
-                osd_oi_insert(NULL, &osd->od_oi, lf, &i_id);
-        }
+static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
+                           struct lu_attr *attr, struct thandle *th)
+{
         return 0;
 }
 
+static void osd_fid_build_name(const struct lu_fid *fid, char *name)
+{
+        static const char *qfmt = LPX64":%lx:%lx";
+
+        sprintf(name, qfmt, fid_seq(fid), fid_oid(fid), fid_ver(fid));
+}
+
+static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
+                     struct lu_attr *attr, struct thandle *th)
+{
+        int result;
+        struct osd_device *osd = osd_obj2dev(obj);
+        struct inode      *dir;
+
+        /*
+         * XXX temporary solution.
+         */
+        struct dentry     *dentry;
+        char               name[32];
+        struct qstr        str = {
+                .name = name
+        };
+
+        LASSERT(obj->oo_inode == NULL);
+        LASSERT(S_ISDIR(attr->la_mode));
+        LASSERT(osd->od_obj_area != NULL);
+
+        dir = osd->od_obj_area->d_inode;
+        LASSERT(dir->i_op != NULL && dir->i_op->mkdir != NULL);
+
+        osd_fid_build_name(lu_object_fid(&obj->oo_dt.do_lu), name);
+        str.len = strlen(name);
+
+        dentry = d_alloc(osd->od_obj_area, &str);
+        if (dentry != NULL) {
+                result = dir->i_op->mkdir(dir, dentry,
+                                          attr->la_mode & (S_IRWXUGO|S_ISVTX));
+                if (result == 0) {
+                        LASSERT(dentry->d_inode != NULL);
+                        obj->oo_inode = dentry->d_inode;
+                        igrab(obj->oo_inode);
+                }
+                dput(dentry);
+        } else
+                result = -ENOMEM;
+        return result;
+}
+
+typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
+                              struct lu_attr *, struct thandle *);
+
+osd_obj_type_f osd_mkreg = NULL;
+osd_obj_type_f osd_mksym = NULL;
+osd_obj_type_f osd_mknod = NULL;
+
+static osd_obj_type_f osd_create_type_f(__u32 mode)
+{
+        osd_obj_type_f result;
+
+        switch (mode) {
+        case S_IFDIR:
+                result = osd_mkdir;
+                break;
+        case S_IFREG:
+                result = osd_mkreg;
+                break;
+        case S_IFLNK:
+                result = osd_mksym;
+                break;
+        case S_IFCHR:
+        case S_IFBLK:
+        case S_IFIFO:
+        case S_IFSOCK:
+                result = osd_mknod;
+                break;
+        default:
+                LBUG();
+                break;
+        }
+        return result;
+ }
+
+static int osd_object_create(struct lu_context *ctx, struct dt_object *dt,
+                             struct lu_attr *attr, struct thandle *th)
+{
+        const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
+        struct osd_object      *obj  = osd_dt_obj(dt);
+        struct osd_device      *osd  = osd_obj2dev(obj);
+        struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
+        int result;
+
+        ENTRY;
+
+        LASSERT(!lu_object_exists(ctx, &dt->do_lu));
+
+        /*
+         * XXX missing: permission checks.
+         */
+
+        /*
+         * XXX missing: sanity checks (valid ->la_mode, etc.)
+         */
+
+        /*
+         * XXX missing: Quote handling.
+         */
+
+        result = osd_create_pre(info, obj, attr, th);
+        if (result == 0) {
+                osd_create_type_f(attr->la_mode & S_IFMT)(info, obj, attr, th);
+                if (result == 0)
+                        result = osd_create_post(info, obj, attr, th);
+        }
+        if (result == 0) {
+                struct osd_inode_id *id = &info->oti_id;
+
+                LASSERT(obj->oo_inode != NULL);
+
+                id->oii_ino = obj->oo_inode->i_ino;
+                id->oii_gen = obj->oo_inode->i_generation;
+
+                osd_oi_write_lock(&osd->od_oi);
+                result = osd_oi_insert(info, &osd->od_oi, fid, id, th);
+                osd_oi_write_unlock(&osd->od_oi);
+        }
+
+        LASSERT(ergo(result == 0, lu_object_exists(ctx, &dt->do_lu)));
+        return result;
+}
+
 static struct dt_object_operations osd_obj_ops = {
         .do_object_lock   = osd_object_lock,
         .do_object_unlock = osd_object_unlock,
@@ -423,6 +588,11 @@ static int osd_mount(struct osd_device *o, struct lustre_cfg *cfg)
 
         ENTRY;
 
+        if (o->od_mount != NULL) {
+                CERROR("Already mounted (%s)\n", dev);
+                RETURN(-EEXIST);
+        }
+
         /* get mount */
         lmi = server_get_mount(dev);
         if (lmi == NULL) {
@@ -434,27 +604,38 @@ static int osd_mount(struct osd_device *o, struct lustre_cfg *cfg)
         /* save lustre_mount_info in dt_device */
         o->od_mount = lmi;
         result = osd_oi_init(&o->od_oi, osd_sb(o)->s_root,
-                             o->od_dt_dev.dd_lu_dev.ld_site);
-
+                             osd2lu_dev(o)->ld_site);
         if (result == 0) {
                 struct dentry *d;
 
-                d = osd_open(osd_sb(o)->s_root, "ROOT", S_IFDIR);
+                d = osd_open(osd_sb(o)->s_root, "*OBJ-TEMP*", S_IFDIR);
                 if (!IS_ERR(d)) {
-                        osd_oi_init0(&o->od_oi, d->d_inode->i_ino,
-                                     d->d_inode->i_generation);
-                        o->od_root_dir = d;
+                        o->od_obj_area = d;
+
+                        d = osd_open(osd_sb(o)->s_root, "ROOT", S_IFDIR);
+                        if (!IS_ERR(d)) {
+                                osd_oi_init0(&o->od_oi, d->d_inode->i_ino,
+                                             d->d_inode->i_generation);
+                                o->od_root_dir = d;
+                        } else
+                                result = PTR_ERR(d);
                 } else
                         result = PTR_ERR(d);
         }
-
+        if (result != 0)
+                osd_device_fini(osd2lu_dev(o));
         RETURN(result);
 }
 
 static struct lu_device *osd_device_fini(struct lu_device *d)
 {
         struct osd_device *o = osd_dev(d);
+
         ENTRY;
+        if (o->od_obj_area != NULL) {
+                dput(o->od_obj_area);
+                o->od_obj_area = NULL;
+        }
         if (o->od_root_dir != NULL) {
                 dput(o->od_root_dir);
                 o->od_root_dir = NULL;
@@ -465,8 +646,7 @@ static struct lu_device *osd_device_fini(struct lu_device *d)
                 server_put_mount(o->od_mount->lmi_name, o->od_mount->lmi_mnt);
 
         o->od_mount = NULL;
-
-       return NULL;
+       RETURN(NULL);
 }
 
 static struct lu_device *osd_device_alloc(struct lu_device_type *t,
@@ -477,10 +657,15 @@ static struct lu_device *osd_device_alloc(struct lu_device_type *t,
 
         OBD_ALLOC_PTR(o);
         if (o != NULL) {
-                l = &o->od_dt_dev.dd_lu_dev;
-                lu_device_init(&o->od_dt_dev.dd_lu_dev, t);
-                o->od_dt_dev.dd_lu_dev.ld_ops = &osd_lu_ops;
-                o->od_dt_dev.dd_ops = &osd_dt_ops;
+                int result;
+
+                result = dt_device_init(&o->od_dt_dev, t);
+                if (result == 0) {
+                        l = osd2lu_dev(o);
+                        l->ld_ops = &osd_lu_ops;
+                        o->od_dt_dev.dd_ops = &osd_dt_ops;
+                } else
+                        l = ERR_PTR(result);
         } else
                 l = ERR_PTR(-ENOMEM);
         return l;
@@ -490,13 +675,13 @@ static void osd_device_free(struct lu_device *d)
 {
         struct osd_device *o = osd_dev(d);
 
-        lu_device_fini(d);
+        dt_device_fini(&o->od_dt_dev);
         OBD_FREE_PTR(o);
 }
 
 static int osd_process_config(struct lu_device *d, struct lustre_cfg *cfg)
 {
-        struct osd_device *o = lu2osd_dev(d);
+        struct osd_device *o = osd_dev(d);
         int err;
 
         switch(cfg->lcfg_command) {
@@ -683,6 +868,21 @@ static struct osd_device *osd_dev(const struct lu_device *d)
         return osd_dt_dev(container_of0(d, struct dt_device, dd_lu_dev));
 }
 
+static struct osd_object *osd_dt_obj(const struct dt_object *d)
+{
+        return osd_obj(&d->do_lu);
+}
+
+static struct osd_device *osd_obj2dev(struct osd_object *o)
+{
+        return osd_dev(o->oo_dt.do_lu.lo_dev);
+}
+
+static struct lu_device *osd2lu_dev(struct osd_device * osd)
+{
+        return &osd->od_dt_dev.dd_lu_dev;
+}
+
 static struct super_block *osd_sb(const struct osd_device *dev)
 {
         return dev->od_mount->lmi_mnt->mnt_sb;
index 02dcc47..674eac3 100644 (file)
@@ -56,40 +56,16 @@ struct osd_device {
         struct lustre_mount_info *od_mount;
         struct osd_oi             od_oi;
         struct dentry            *od_root_dir;
+        struct dentry            *od_obj_area;
 };
 
-static inline struct osd_object * dt2osd_obj(struct dt_object *o)
-{
-        return container_of0(o, struct osd_object, oo_dt);
-}
-
-static inline struct osd_device * osd_obj2dev(struct osd_object *o) {
-        struct lu_device *lu = o->oo_dt.do_lu.lo_dev;
-        struct dt_device *dt = container_of0(lu, struct dt_device, dd_lu_dev);
-
-        return container_of0(dt, struct osd_device, od_dt_dev);
-}
-
-static inline struct osd_device * dt2osd_dev(struct dt_device *dt) {
-        return container_of0(dt, struct osd_device, od_dt_dev);
-}
-
-static inline struct osd_device * lu2osd_dev(struct lu_device *d) {
-        return dt2osd_dev(container_of0(d, struct dt_device, dd_lu_dev));
-}
-
-static inline struct lu_device * osd2lu_dev(struct osd_device * osd)
-{
-        return &osd->od_dt_dev.dd_lu_dev;
-}
-
 struct dentry *osd_lookup(struct dentry *parent, const char *name);
 struct dentry *osd_open(struct dentry *parent, const char *name, mode_t mode);
 
 
 struct osd_thread_info {
-        char          oti_name[64];
-        struct lu_fid oti_fid;
+        struct lu_fid       oti_fid;
+        struct osd_inode_id oti_id;
 };
 
 #endif /* __KERNEL__ */