Whamcloud - gitweb
- udmu library is added to dmu-osd directory
authoralex <alex>
Sat, 21 Feb 2009 18:29:56 +0000 (18:29 +0000)
committeralex <alex>
Sat, 21 Feb 2009 18:29:56 +0000 (18:29 +0000)
 - bunch of fixes in dmu osd

lustre/Makefile.in
lustre/dmu-osd/Makefile.in
lustre/dmu-osd/osd_handler.c
lustre/dmu-osd/osd_internal.h
lustre/dmu-osd/udmu.c [new file with mode: 0644]
lustre/dmu-osd/udmu.h [new file with mode: 0644]
lustre/dmu-osd/udmu_util.c [new file with mode: 0644]
lustre/dmu-osd/udmu_util.h [new file with mode: 0644]

index f1c44fa..458abc3 100644 (file)
@@ -8,7 +8,8 @@ subdir-m += obdecho
 subdir-m += mgc
 subdir-m += quota
 
-@SERVER_TRUE@subdir-m += mds obdfilter ost mgs mdt cmm mdd osd
+@SERVER_TRUE@subdir-m += mds ofd ost mgs mdt cmm mdd osd #obdfilter
 @CLIENT_TRUE@subdir-m += mdc lmv llite fld
+@KDMU_TRUE@subdir-m += dmu-osd
 
 @INCLUDE_RULES@
index 54150fd..210b182 100644 (file)
@@ -1,5 +1,5 @@
 MODULES := osd
-osd-objs := osd_handler.o osd_oi.o osd_igif.o
+osd-objs := osd_handler.o udmu.o udmu_util.o
 
 EXTRA_PRE_CFLAGS := -I@LINUX@/fs -I@LDISKFS_DIR@ -I@LDISKFS_DIR@/ldiskfs
 
index 7f528de..2e4fc86 100644 (file)
 #include <libcfs/libcfs.h>
 #include <lustre_fsfilt.h>
 
-#include <fcntl.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <dirent.h>
-
 #ifndef FALSE
 #      define  FALSE   (0)
 #endif
 /* fid_is_local() */
 #include <lustre_fid.h>
 
-#include <udmu.h>
-#include <udmu_util.h>
+#include "udmu.h"
+#include "udmu_util.h"
 
 #include "osd_internal.h"
 
+#define LUSTRE_ROOT_FID_SEQ     0
+
 struct osd_object {
         struct dt_object       oo_dt;
         /*
@@ -108,7 +104,8 @@ struct osd_device {
         /* super-class */
         struct dt_device          od_dt_dev;
         /* information about underlying file system */
-        struct lustre_mount_info *od_mount;
+        udmu_objset_t            *od_objset;
+        //struct lustre_mount_info *od_mount;
 
         /* Environment for transaction commit callback.
          * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
@@ -134,8 +131,8 @@ struct osd_device {
         struct kstatfs            od_kstatfs;
         spinlock_t                od_osfs_lock;
 
-        dmu_buf_t                  *od_root_db;
-        dmu_buf_t                  *od_objdir_db;
+        dmu_buf_t                *od_root_db;
+        dmu_buf_t                *od_objdir_db;
 };
 
 struct osd_thandle {
@@ -153,7 +150,8 @@ static int   lu_device_is_osd  (const struct lu_device *d);
 static int   osd_type_init     (struct lu_device_type *t);
 static void  osd_type_fini     (struct lu_device_type *t);
 static int   osd_object_init   (const struct lu_env *env,
-                                struct lu_object *l);
+                                struct lu_object *l,
+                                const struct lu_object_conf *conf);
 static void  osd_object_release(const struct lu_env *env,
                                 struct lu_object *l);
 static int   osd_object_print  (const struct lu_env *env, void *cookie,
@@ -184,9 +182,9 @@ static void  osd_trans_stop    (const struct lu_env *env,
 static int   osd_object_is_root(const struct osd_object *obj);
 
 static struct thandle *osd_trans_create(const struct lu_env *env,
-                                       struct dt_device *dt,
-                                       struct txn_param *p);
-static int osd_trans_start(const struct lu_env *env, struct thandle *th);
+                                       struct dt_device *dt);
+static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+                           struct thandle *th);
 static void osd_trans_stop(const struct lu_env *env, struct thandle *th);
 
 static struct osd_object  *osd_obj          (const struct lu_object *o);
@@ -203,7 +201,6 @@ static struct lu_device   *osd_device_alloc (const struct lu_env *env,
 static struct lu_object   *osd_object_alloc (const struct lu_env *env,
                                              const struct lu_object_header *hdr,
                                              struct lu_device *d);
-static struct super_block *osd_sb           (const struct osd_device *dev);
 extern struct lustre_mount_info *server_get_mount(const char *name);
 extern int server_put_mount(const char *name, struct vfsmount *mnt);
 
@@ -254,55 +251,57 @@ static void lu_attr2vnattr(struct lu_attr *la, vnattr_t *vap)
         if (la->la_valid & LA_MODE) {
                 /* get mode only */
                 vap->va_mode = la->la_mode & ~S_IFMT;
-                vap->va_mask |= AT_MODE;
+                vap->va_mask |= DMU_AT_MODE;
 
                 vap->va_type = lu_mode2vtype(la->la_mode);
-                vap->va_mask |= AT_TYPE;
+                vap->va_mask |= DMU_AT_TYPE;
 
         }
         if (la->la_valid & LA_UID) {
                 vap->va_uid = la->la_uid;
-                vap->va_mask |= AT_UID;
+                vap->va_mask |= DMU_AT_UID;
         }
         if (la->la_valid & LA_GID) {
                 vap->va_gid = la->la_gid;
-                vap->va_mask |= AT_GID;
+                vap->va_mask |= DMU_AT_GID;
         }
         if (la->la_valid & LA_ATIME) {
                 vap->va_atime.tv_sec = la->la_atime;
                 vap->va_atime.tv_nsec = 0;
-                vap->va_mask |= AT_ATIME;
+                vap->va_mask |= DMU_AT_ATIME;
         }
         if (la->la_valid & LA_MTIME) {
                 vap->va_mtime.tv_sec = la->la_mtime;
                 vap->va_mtime.tv_nsec = 0;
-                vap->va_mask |= AT_MTIME;
+                vap->va_mask |= DMU_AT_MTIME;
         }
         if (la->la_valid & LA_CTIME) {
                 vap->va_ctime.tv_sec = la->la_ctime;
                 vap->va_ctime.tv_nsec = 0;
-                vap->va_mask |= AT_CTIME;
+                vap->va_mask |= DMU_AT_CTIME;
         }
 
         if (la->la_valid & LA_SIZE) {
                 vap->va_size = la->la_size;
-                vap->va_mask |= AT_SIZE;
+                vap->va_mask |= DMU_AT_SIZE;
         }
 
         if (la->la_valid & LA_RDEV) {
                 vap->va_rdev   = la->la_rdev;
-                vap->va_mask |= AT_RDEV;
+                vap->va_mask |= DMU_AT_RDEV;
         }
 
         if (la->la_valid & LA_NLINK) {
                 vap->va_nlink = la->la_nlink ;
-                vap->va_mask |= AT_NLINK;
+                vap->va_mask |= DMU_AT_NLINK;
         }
 
+#if 0
         if (la->la_valid & LA_FLAGS) {
                 vap->va_flags = (la->la_flags & FS_FL_USER_MODIFIABLE);
-                vap->va_mask |= AT_FLAGS;
+                vap->va_mask |= DMU_AT_FLAGS;
         }
+#endif
 
         EXIT;
 }
@@ -331,60 +330,62 @@ static void vnattr2lu_attr(vnattr_t *vap, struct lu_attr *la)
 {
         la->la_valid = 0;
 
-        if (vap->va_mask & AT_SIZE) {
+        if (vap->va_mask & DMU_AT_SIZE) {
                 la->la_size = (unsigned long long)vap->va_size;
                 la->la_valid |= LA_SIZE;
         }
-        if (vap->va_mask & AT_MTIME) {
+        if (vap->va_mask & DMU_AT_MTIME) {
                 la->la_mtime = (unsigned long long)vap->va_mtime.tv_sec;
                 la->la_valid |= LA_MTIME;
         }
-        if (vap->va_mask & AT_CTIME) {
+        if (vap->va_mask & DMU_AT_CTIME) {
                 la->la_ctime = (unsigned long long)vap->va_ctime.tv_sec;
                 la->la_valid |= LA_CTIME;
         }
-        if (vap->va_mask & AT_ATIME) {
+        if (vap->va_mask & DMU_AT_ATIME) {
                 la->la_atime = (unsigned long long)vap->va_atime.tv_sec;
                 la->la_valid |= LA_ATIME;
         }
-        if (vap->va_mask & AT_MODE) {
+        if (vap->va_mask & DMU_AT_MODE) {
                 la->la_mode = (unsigned int)vap->va_mode;
                 la->la_valid |= LA_MODE;
         }
-        if (vap->va_mask & AT_TYPE) {
+        if (vap->va_mask & DMU_AT_TYPE) {
                 la->la_mode |= vtype2lu_mode(vap->va_type);
                 la->la_valid |= LA_TYPE;
         }
-        if (vap->va_mask & AT_UID) {
+        if (vap->va_mask & DMU_AT_UID) {
                 la->la_uid = vap->va_uid;
                 la->la_valid |= LA_UID;
         }
-        if (vap->va_mask & AT_GID) {
+        if (vap->va_mask & DMU_AT_GID) {
                 la->la_gid = vap->va_gid;
                 la->la_valid |= LA_GID;
         }
-        if (vap->va_mask & AT_NLINK) {
+        if (vap->va_mask & DMU_AT_NLINK) {
                 la->la_nlink = vap->va_nlink;
                 la->la_valid |= LA_NLINK;
         }
-        if (vap->va_mask & AT_BLKSIZE) {
+        if (vap->va_mask & DMU_AT_BLKSIZE) {
                 la->la_blksize = vap->va_blksize;
                 /* XXX: if 0 then blksize != power of 2 */
                 la->la_blkbits = vap->va_blkbits;
                 la->la_valid |= LA_BLKSIZE;
         }
-        if (vap->va_mask & AT_RDEV) {
+        if (vap->va_mask & DMU_AT_RDEV) {
                 la->la_rdev = vap->va_rdev;
                 la->la_valid |= LA_RDEV;
         }
-        if (vap->va_mask & AT_NBLOCKS) {
+        if (vap->va_mask & DMU_AT_NBLOCKS) {
                 la->la_blocks = vap->va_nblocks;
                 la->la_valid |= LA_BLOCKS;
         }
-        if (vap->va_mask & AT_FLAGS) {
+#if 0
+        if (vap->va_mask & DMU_AT_FLAGS) {
                 la->la_flags  = vap->va_flags;
                 la->la_valid |= LA_FLAGS;
         }
+#endif
 
 }
 
@@ -472,7 +473,7 @@ static void osd_object_init0(struct osd_object *obj)
                         vtype2lu_mode(va.va_type);
         } else {
                 CDEBUG(D_OTHER, "object %llu:%lu does not exist\n",
-                        fid->f_seq, fid->f_oid);
+                        fid->f_seq, (unsigned long) fid->f_oid);
         }
 }
 
@@ -480,7 +481,8 @@ static void osd_object_init0(struct osd_object *obj)
  * Concurrency: no concurrent access is possible that early in object
  * life-cycle.
  */
-static int osd_object_init(const struct lu_env *env, struct lu_object *l)
+static int osd_object_init(const struct lu_env *env, struct lu_object *l,
+                           const struct lu_object_conf *conf)
 {
         struct osd_object *obj = osd_obj(l);
         int result;
@@ -563,7 +565,6 @@ static int osd_object_destroy(const struct lu_env *env, struct osd_object *obj)
         vnattr_t va;
         int rc;
         struct thandle         *th;
-        struct txn_param       prm;
 
         ENTRY;
         LASSERT(obj->oo_db != NULL);
@@ -577,8 +578,7 @@ static int osd_object_destroy(const struct lu_env *env, struct osd_object *obj)
         osd_fid2str(buf, lu_object_fid(&obj->oo_dt.do_lu));
 
         /* create tx */
-        txn_param_init(&prm, 0);
-        th = osd_trans_create(env, &osd->od_dt_dev, &prm);
+        th = osd_trans_create(env, &osd->od_dt_dev);
 
         if (IS_ERR(th)) {
                 RETURN (PTR_ERR(th));
@@ -591,10 +591,10 @@ static int osd_object_destroy(const struct lu_env *env, struct osd_object *obj)
         osd_declare_object_delete(env, obj, th);
 
         /* start change */
-        osd_trans_start(env, th);
+        osd_trans_start(env, &osd->od_dt_dev, th);
 
         /* remove obj ref from main obj. dir */
-        rc = udmu_zap_delete((osd_sb(osd))->uos, zapdb, oh->ot_tx, buf);
+        rc = udmu_zap_delete(osd->od_objset, zapdb, oh->ot_tx, buf);
         if (rc) {
                 CERROR("udmu_zap_delete() failed with error %d", rc);
                 RETURN (rc);
@@ -602,7 +602,8 @@ static int osd_object_destroy(const struct lu_env *env, struct osd_object *obj)
 
         udmu_object_getattr(obj->oo_db, &va);
         /* kill object */
-        rc = udmu_object_delete((osd_sb(osd))->uos, &obj->oo_db, oh->ot_tx, osd_object_tag);
+        rc = udmu_object_delete(osd->od_objset, &obj->oo_db,
+                                oh->ot_tx, osd_object_tag);
         if (rc) {
                 CERROR("udmu_object_delete() failed with error %d", rc);
                 RETURN (rc);
@@ -670,8 +671,7 @@ static int osd_statfs(const struct lu_env *env,
         spin_lock(&osd->od_osfs_lock);
         /* cache 1 second */
         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
-                rc = udmu_objset_statvfs((osd_sb(osd))->uos,
-                                         (struct statvfs64 *)kfs);
+                rc = udmu_objset_statvfs(osd->od_objset, (struct statvfs64 *)kfs);
 
                /* Reserve 64MB for ZFS COW symantics so that grants won't
                 * consume all available space. COW needs space to duplicate
@@ -745,14 +745,12 @@ static void osd_trans_commit_cb(void *cb_data, int error)
         th->th_dev = NULL;
         lu_context_exit(&th->th_ctx);
         lu_context_fini(&th->th_ctx);
-
         udmu_tx_cb_destroy(oh);
         EXIT;
 }
 
 static struct thandle *osd_trans_create(const struct lu_env *env,
-                                       struct dt_device *dt,
-                                       struct txn_param *p)
+                                       struct dt_device *dt)
 {
         struct osd_device *osd = osd_dt_dev(dt);
         struct osd_thandle *oh;
@@ -760,14 +758,16 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
         dmu_tx_t *tx;
         int hook_res, rc;
         ENTRY;
-        tx = udmu_tx_create((osd_sb(osd))->uos);
+        tx = udmu_tx_create(osd->od_objset);
         if (tx == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
         /* alloc callback data */
         oh = udmu_tx_cb_create(sizeof(*oh));
-        oh->ot_tx = tx;
+#if 0
         oh->ot_sync = p->tp_sync;
+#endif
+        oh->ot_tx = tx;
         th = &oh->ot_super;
         th->th_dev = dt;
         th->th_result = 0;
@@ -777,9 +777,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
         /* add commit callback */
         rc = udmu_tx_cb_add(tx, osd_trans_commit_cb, (void *)oh);
         LASSERT(rc == 0);
-        p->txn = th;
 
-        hook_res = dt_txn_hook_start(env, dt, p);
+        hook_res = dt_txn_hook_start(env, dt, th);
         if (hook_res != 0)
                 RETURN(ERR_PTR(hook_res));
 
@@ -789,7 +788,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
 /*
  * Concurrency: shouldn't matter.
  */
-static int osd_trans_start(const struct lu_env *env, struct thandle *th)
+static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+                           struct thandle *th)
 {
         struct osd_thandle *oh;
         int rc;
@@ -824,7 +824,7 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
 
         udmu_tx_commit(oh->ot_tx);
         if (oh->ot_sync)
-                udmu_wait_synced((osd_sb(osd))->uos, oh->ot_tx);
+                udmu_wait_synced(osd->od_objset, oh->ot_tx);
         EXIT;
 }
 
@@ -835,7 +835,7 @@ static int osd_sync(const struct lu_env *env, struct dt_device *d)
 {
         struct osd_device  *osd = osd_dt_dev(d);
         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_NAME);
-        udmu_wait_synced((osd_sb(osd))->uos, NULL);
+        udmu_wait_synced(osd->od_objset, NULL);
         return 0;
 }
 
@@ -855,13 +855,6 @@ static void osd_ro(const struct lu_env *env, struct dt_device *d)
 /*
  * Concurrency: serialization provided by callers.
  */
-static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
-                          enum dt_txn_op op)
-{
-        /* we don't really care - no transactions in POSIX */
-        return 1;
-}
-
 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
                               int mode, unsigned long timeout, __u32 alg,
                               struct lustre_capa_key *keys)
@@ -885,12 +878,11 @@ static struct dt_device_operations osd_dt_ops = {
         .dt_conf_get       = osd_conf_get,
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
-        .dt_credit_get     = osd_credit_get,
         .dt_init_capa_ctxt = osd_init_capa_ctxt
 };
 
 static void osd_object_read_lock(const struct lu_env *env,
-                                 struct dt_object *dt)
+                                 struct dt_object *dt, unsigned role)
 {
         struct osd_object *obj = osd_dt_obj(dt);
 
@@ -900,7 +892,7 @@ static void osd_object_read_lock(const struct lu_env *env,
 }
 
 static void osd_object_write_lock(const struct lu_env *env,
-                                  struct dt_object *dt)
+                                  struct dt_object *dt, unsigned role)
 {
         struct osd_object *obj = osd_dt_obj(dt);
 
@@ -950,7 +942,9 @@ static int osd_attr_get(const struct lu_env *env,
 
 static int osd_declare_attr_set(const struct lu_env *env,
                                 struct dt_object *dt,
-                                struct thandle *handle)
+                                const struct lu_attr *attr,
+                                struct thandle *handle,
+                                struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_thandle *oh;
@@ -969,8 +963,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
 }
 
 static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
-                        const struct lu_attr *attr, struct thandle *handle,
-                        struct lustre_capa *capa)
+                        const struct lu_attr *attr, struct thandle *handle)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_thandle *oh;
@@ -993,7 +986,8 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt,
-                     __u64 start, __u64 end, struct thandle *handle)
+                             __u64 start, __u64 end, struct thandle *handle,
+                             struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_thandle *oh;
@@ -1039,13 +1033,13 @@ static int osd_punch(const struct lu_env *env, struct dt_object *dt,
                                   start, len ? len : DMU_OBJECT_END);
          */
 
-        udmu_object_punch((osd_sb(osd))->uos, obj->oo_db, oh->ot_tx, start, len);
+        udmu_object_punch(osd->od_objset, obj->oo_db, oh->ot_tx, start, len);
 
         /* set new size */
 #if 0
         /* XXX: umdu_object_punch set the size already, why to set again? */
         if ((end == OBD_OBJECT_EOF) || (start + end > vap.va_size)) {
-                vap.va_mask = AT_SIZE;
+                vap.va_mask = DMU_AT_SIZE;
                 vap.va_size = start;
                 udmu_object_setattr(obj->oo_db, oh->ot_tx, &vap);
         }
@@ -1078,7 +1072,10 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
 }
 
 static int osd_declare_object_create(const struct lu_env *env,
-                                     struct dt_object *dt, __u32 mode,
+                                     struct dt_object *dt,
+                                     struct lu_attr *attr,
+                                     struct dt_allocation_hint *hint,
+                                     struct dt_object_format *dof,
                                      struct thandle *handle)
 {
         const struct lu_fid *fid  = lu_object_fid(&dt->do_lu);
@@ -1095,23 +1092,18 @@ static int osd_declare_object_create(const struct lu_env *env,
         oh = container_of0(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_tx != NULL);
 
-        switch (mode & S_IFMT) {
-                case S_IFDIR:
+        switch (dof->dof_type) {
+                case DFT_DIR:
                         /* for zap create */
                         udmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL);
                         break;
-                case S_IFREG:
-                case S_IFCHR:
-                case S_IFBLK:
-                case S_IFIFO:
-                case S_IFSOCK:
+                case DFT_REGULAR:
+                case DFT_SYM:
+                case DFT_NODE:
+                case DFT_INDEX:
                         /* first, we'll create new object */
                         udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
                         break;
-                case S_IFLNK:
-                        udmu_tx_hold_write(oh->ot_tx, DMU_NEW_OBJECT, 0, PATH_MAX);
-                        udmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT);
-                        break;
 
                 default:
                         LBUG();
@@ -1135,8 +1127,7 @@ static dmu_buf_t * osd_mkdir(struct osd_thread_info *info, struct osd_device  *o
         dmu_buf_t * db;
 
         LASSERT(S_ISDIR(attr->la_mode));
-        udmu_zap_create((osd_sb(osd))->uos, &db, oh->ot_tx,
-                         osd_object_tag);
+        udmu_zap_create(osd->od_objset, &db, oh->ot_tx, osd_object_tag);
 
         return db;
 }
@@ -1147,8 +1138,7 @@ static dmu_buf_t* osd_mkreg(struct osd_thread_info *info, struct osd_device  *os
 {
         dmu_buf_t * db;
         LASSERT(S_ISREG(attr->la_mode));
-        udmu_object_create((osd_sb(osd))->uos, &db, oh->ot_tx,
-                            osd_object_tag);
+        udmu_object_create(osd->od_objset, &db, oh->ot_tx, osd_object_tag);
         return db;
 }
 
@@ -1159,8 +1149,7 @@ static dmu_buf_t* osd_mksym(struct osd_thread_info *info, struct osd_device  *os
         dmu_buf_t * db;
 
         LASSERT(S_ISLNK(attr->la_mode));
-        udmu_object_create((osd_sb(osd))->uos, &db, oh->ot_tx,
-                            osd_object_tag);
+        udmu_object_create(osd->od_objset, &db, oh->ot_tx, osd_object_tag);
         return db;
 }
 
@@ -1175,11 +1164,10 @@ static dmu_buf_t* osd_mknod(struct osd_thread_info *info, struct osd_device  *os
         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
                 S_ISFIFO(mode) || S_ISSOCK(mode));
 
-        udmu_object_create((osd_sb(osd))->uos, &db, oh->ot_tx,
-                           osd_object_tag);
+        udmu_object_create(osd->od_objset, &db, oh->ot_tx, osd_object_tag);
 
         if (db && (S_ISCHR(mode)||S_ISBLK(mode))) {
-                vap.va_mask = AT_RDEV;
+                vap.va_mask = DMU_AT_RDEV;
                 vap.va_rdev = attr->la_rdev;
                 udmu_object_setattr(db, NULL, &vap);
         }
@@ -1190,24 +1178,22 @@ typedef dmu_buf_t* (*osd_obj_type_f)(struct osd_thread_info *info, struct osd_de
                      struct lu_attr *attr,
                      struct osd_thandle *oh);
 
-static osd_obj_type_f osd_create_type_f(__u32 mode)
+static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
 {
         osd_obj_type_f result;
 
-        switch (mode & S_IFMT) {
-        case S_IFDIR:
+        switch (type) {
+        case DFT_DIR:
+        case DFT_INDEX:
                 result = osd_mkdir;
                 break;
-        case S_IFREG:
+        case DFT_REGULAR:
                 result = osd_mkreg;
                 break;
-        case S_IFLNK:
+        case DFT_SYM:
                 result = osd_mksym;
                 break;
-        case S_IFCHR:
-        case S_IFBLK:
-        case S_IFIFO:
-        case S_IFSOCK:
+        case DFT_NODE:
                 result = osd_mknod;
                 break;
         default:
@@ -1224,6 +1210,7 @@ static osd_obj_type_f osd_create_type_f(__u32 mode)
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
                              struct lu_attr *attr, 
                              struct dt_allocation_hint *hint,
+                             struct dt_object_format *dof,
                              struct thandle *th)
 {
         const struct lu_fid    *fid  = lu_object_fid(&dt->do_lu);
@@ -1264,8 +1251,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         oid = udmu_object_get_id(db);
 
         /* XXX: zapdb should be replaced with zap-mapping-fids-to-dnode */
-        rc = udmu_zap_insert((osd_sb(osd))->uos, zapdb, oh->ot_tx, buf, &oid,
-                             sizeof (oid));
+        rc = udmu_zap_insert(osd->od_objset, zapdb, oh->ot_tx, buf,
+                             &oid, sizeof (oid));
         if(rc)
                 goto out;
 
@@ -1276,7 +1263,8 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         udmu_object_getattr(db, &vap);
         vnattr2lu_attr(&vap, attr);
 
-        CDEBUG(D_OTHER, "create object %s oid[%d] (objid %llu)\n", buf, oid, vap.va_nodeid);
+        CDEBUG(D_OTHER, "create object %s oid["LPD64"] (objid %llu)\n",
+               buf, oid, vap.va_nodeid);
 
         rc = osd_create_post(info, obj, attr, th);
 
@@ -1298,7 +1286,7 @@ struct osd_zap_it {
 };
 
 static struct dt_it *osd_zap_it_init(const struct lu_env *env,
-                struct dt_object *dt, int writable,
+                struct dt_object *dt,
                 struct lustre_capa *capa)
 {
         struct osd_zap_it       *it;
@@ -1313,8 +1301,8 @@ static struct dt_it *osd_zap_it_init(const struct lu_env *env,
 
         OBD_ALLOC_PTR(it);
         if (it != NULL) {
-                if (udmu_zap_cursor_init(&it->ozi_zc, osd_sb(osd)->uos,
-                                udmu_object_get_id(obj->oo_db)))
+                if (udmu_zap_cursor_init(&it->ozi_zc, osd->od_objset,
+                                         udmu_object_get_id(obj->oo_db)))
                         RETURN(ERR_PTR(-ENOMEM));
 
                 it->ozi_obj = obj;
@@ -1361,7 +1349,7 @@ static void osd_zap_it_put(const struct lu_env *env, struct dt_it *di)
 static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
 {
         struct osd_zap_it *it = (struct osd_zap_it *)di;
-        int rc;
+        //int rc;
 
         ENTRY;
         udmu_zap_cursor_advance(it->ozi_zc);
@@ -1372,13 +1360,19 @@ static int osd_zap_it_next(const struct lu_env *env, struct dt_it *di)
          * We shld make changes to Iterator API to not return status for this API
          * */
 
-        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX);
+        /* XXX: not implemented yet */
+        RETURN(0);
+        LBUG();
+#if 0
+        rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NAME_MAX);
         if (rc == ENOENT) /* end of dir*/
                 RETURN(+1);
 
         RETURN((-rc));
+#endif
 }
 
+#if 0
 static int osd_zap_it_del(const struct lu_env *env, struct dt_it *di,
                 struct thandle *th)
 {
@@ -1388,32 +1382,43 @@ static int osd_zap_it_del(const struct lu_env *env, struct dt_it *di,
 
         RETURN(0);
 }
+#endif
 
 static struct dt_key *osd_zap_it_key(const struct lu_env *env,
                 const struct dt_it *di)
 {
-        struct osd_zap_it *it = (struct osd_zap_it *)di;
-        int rc;
+        //struct osd_zap_it *it = (struct osd_zap_it *)di;
+        //int rc = 0;
 
         ENTRY;
+        /* XXX: not impelemented yet */
+        LBUG();
+        RETURN(NULL);
+#if 0
         rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1);
         if (!rc)
                 RETURN((struct dt_key *)it->ozi_name);
         else
                 RETURN(ERR_PTR(-rc));
+#endif
 }
 
 static int osd_zap_it_key_size(const struct lu_env *env, const struct dt_it *di)
 {
-        struct osd_zap_it *it = (struct osd_zap_it *)di;
-        int rc;
+        //struct osd_zap_it *it = (struct osd_zap_it *)di;
+        //int rc = 0;
 
         ENTRY;
+        /* XXX: not implemented yet */
+        LBUG();
+        RETURN(0);
+#if 0
         rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, it->ozi_name, NAME_MAX+1);
         if (!rc)
                 RETURN(strlen(it->ozi_name));
         else
                 RETURN(-rc);
+#endif
 }
 
 
@@ -1449,13 +1454,17 @@ static __u64 osd_zap_it_store(const struct lu_env *env, const struct dt_it *di)
 static int osd_zap_it_load(const struct lu_env *env,
                 const struct dt_it *di, __u64 hash)
 {
-        struct osd_zap_it *it = (struct osd_zap_it *)di;
-        struct osd_object *obj = it->ozi_obj;
-        int rc;
+        //struct osd_zap_it *it = (struct osd_zap_it *)di;
+        //struct osd_object *obj = it->ozi_obj;
+        //int rc;
 
         ENTRY;
-        udmu_zap_cursor_init_serialized(it->ozi_zc,  osd_sb(osd_obj2dev(obj))->uos,
-                        udmu_object_get_id(obj->oo_db), hash);
+        /* XXX: not implemented yet */
+        LBUG();
+        RETURN(0);
+#if 0
+        udmu_zap_cursor_init_serialized(it->ozi_zc,  osd_obj2dev(obj)->od_objset,
+                                        udmu_object_get_id(obj->oo_db), hash);
 
         /* same as osd_zap_it_next()*/
         rc = udmu_zap_cursor_retrieve_key(it->ozi_zc, NULL, NAME_MAX);
@@ -1465,6 +1474,7 @@ static int osd_zap_it_load(const struct lu_env *env,
                 RETURN(0);
 
         RETURN(-rc);
+#endif
 }
 
 static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
@@ -1476,7 +1486,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
         struct lu_fid_pack *pack;
         struct lu_fid *fid;
         dmu_buf_t *zapdb = obj->oo_db;
-        dmu_buf_t *db;
+        //dmu_buf_t *db;
         uint64_t oid;
         int rc;
         ENTRY;
@@ -1484,8 +1494,8 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
         LASSERT(udmu_object_is_zap(obj->oo_db));
 
         if (osd_object_is_root(obj)) {
-                rc = udmu_zap_lookup((osd_sb(osd))->uos, zapdb, (char *) key, &oid,
-                                sizeof(uint64_t), sizeof(uint64_t));
+                rc = udmu_zap_lookup(osd->od_objset, zapdb, (char *) key, &oid,
+                                     sizeof(uint64_t), sizeof(uint64_t));
                 if (rc) {
                         RETURN(-rc);
                 }
@@ -1497,17 +1507,18 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
                 fid->f_seq = LUSTRE_FID_INIT_OID;
                 fid->f_oid = oid; /* XXX: f_oid is 32bit, oid - 64bit */
         } else {
-                rc = udmu_zap_lookup((osd_sb(osd))->uos, zapdb, (char *) key, rec,
-                                17, 1);
+                rc = udmu_zap_lookup(osd->od_objset, zapdb, (char *) key,
+                                     rec, 17, 1);
         }
         RETURN(-rc);
 }
 
 static int osd_declare_index_insert(const struct lu_env *env,
                                     struct dt_object *dt,
-                                    const int valsize, 
+                                    const struct dt_rec *rec,
                                     const struct dt_key *key,
-                                    struct thandle *th)
+                                    struct thandle *th,
+                                    struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         uint64_t zapid;
@@ -1531,7 +1542,7 @@ static int osd_declare_index_insert(const struct lu_env *env,
 
 static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
                             const struct dt_rec *rec, const struct dt_key *key,
-                            struct thandle *th, struct lustre_capa *capa)
+                            struct thandle *th, int ignore_quota)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_device *osd = osd_obj2dev(obj);
@@ -1563,7 +1574,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
         pack = (struct lu_fid_pack *) rec;
 
         /* Insert (key,oid) into ZAP */
-        rc = udmu_zap_insert((osd_sb(osd))->uos, zap_db, oh->ot_tx,
+        rc = udmu_zap_insert(osd->od_objset, zap_db, oh->ot_tx,
                              (char *) key, pack, pack->fp_len);
 
         RETURN(-rc);
@@ -1572,7 +1583,8 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
 static int osd_declare_index_delete(const struct lu_env *env,
                                     struct dt_object *dt,
                                     const struct dt_key *key,
-                                    struct thandle *th)
+                                    struct thandle *th,
+                                    struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         uint64_t zapid;
@@ -1598,8 +1610,7 @@ static int osd_declare_index_delete(const struct lu_env *env,
 }
 
 static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
-                            const struct dt_key *key, struct thandle *th,
-                            struct lustre_capa *capa)
+                            const struct dt_key *key, struct thandle *th)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_device *osd = osd_obj2dev(obj);
@@ -1616,8 +1627,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
         LASSERT(oh->ot_tx != NULL);
 
         /* Remove key from the ZAP */
-        rc = udmu_zap_delete((osd_sb(osd))->uos, zap_db, oh->ot_tx,
-                             (char *) key);
+        rc = udmu_zap_delete(osd->od_objset, zap_db, oh->ot_tx, (char *) key);
 
         if (rc) {
                 CERROR("udmu_zap_delete() failed with error %d", rc);
@@ -1637,7 +1647,6 @@ static struct dt_index_operations osd_index_ops = {
                 .fini     = osd_zap_it_fini,
                 .get      = osd_zap_it_get,
                 .put      = osd_zap_it_put,
-                .del      = osd_zap_it_del,
                 .next     = osd_zap_it_next,
                 .key      = osd_zap_it_key,
                 .key_size = osd_zap_it_key_size,
@@ -1657,11 +1666,11 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
         return 0;
 }
 
-static void osd_declare_object_ref_add(const struct lu_env *env,
+static int osd_declare_object_ref_add(const struct lu_env *env,
                                struct dt_object *dt,
                                struct thandle *th)
 {
-        osd_declare_attr_set(env, dt, th);
+        return osd_declare_attr_set(env, dt, NULL, th, BYPASS_CAPA);
 }
 
 /*
@@ -1690,13 +1699,11 @@ static void osd_object_ref_add(const struct lu_env *env,
         spin_unlock(&obj->oo_guard);
 }
 
-static void osd_declare_object_ref_del(const struct lu_env *env,
+static int osd_declare_object_ref_del(const struct lu_env *env,
                                        struct dt_object *dt,
                                        struct thandle *handle)
 {
-        ENTRY;
-        osd_declare_attr_set(env, dt, handle);
-        EXIT;
+        return osd_declare_attr_set(env, dt, NULL, handle, BYPASS_CAPA);
 }
 
 /*
@@ -1743,9 +1750,14 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
         RETURN(rc);
 }
 
-int osd_declare_xattr_set(const struct lu_env *env,
-                struct dt_object *dt,
-                struct thandle *handle)
+        int   (*do_declare_xattr_set)(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      const int buflen, const char *name, int fl,
+                                      struct thandle *handle,
+                                      struct lustre_capa *capa);
+int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                          const int buflen, const char *name, int fl,
+                          struct thandle *handle, struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_thandle *oh;
@@ -1766,11 +1778,9 @@ int osd_declare_xattr_set(const struct lu_env *env,
 
 int osd_xattr_set(const struct lu_env *env,
                 struct dt_object *dt, const struct lu_buf *buf,
-                const char *name, int fl, struct thandle *handle,
-                struct lustre_capa *capa)
+                const char *name, int fl, struct thandle *handle)
 {
         struct osd_object  *obj  = osd_dt_obj(dt);
-        struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oh;
         int rc;
 
@@ -1782,14 +1792,14 @@ int osd_xattr_set(const struct lu_env *env,
         oh = container_of0(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_tx != NULL);
 
-        rc = udmu_set_xattr((osd_sb(osd))->uos, obj->oo_db,
-                        buf->lb_buf, buf->lb_len, name, oh->ot_tx);
+        rc = udmu_set_xattr(obj->oo_db, buf->lb_buf, buf->lb_len, name, oh->ot_tx);
         RETURN(rc);
 }
 
-int osd_declare_xattr_del(const struct lu_env *env,
-                struct dt_object *dt,
-                struct thandle *handle)
+
+int osd_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                          const char *name, struct thandle *handle,
+                          struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
         struct osd_thandle *oh;
@@ -1808,13 +1818,10 @@ int osd_declare_xattr_del(const struct lu_env *env,
         RETURN(0);
 }
 
-int osd_xattr_del(const struct lu_env *env,
-                struct dt_object *dt,
-                const char *name, struct thandle *handle,
-                struct lustre_capa *capa)
+int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                  const char *name, struct thandle *handle)
 {
         struct osd_object  *obj  = osd_dt_obj(dt);
-        struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oh;
         int rc;
 
@@ -1826,8 +1833,7 @@ int osd_xattr_del(const struct lu_env *env,
         oh = container_of0(handle, struct osd_thandle, ot_super);
         LASSERT(oh->ot_tx != NULL);
 
-        rc = udmu_del_xattr((osd_sb(osd))->uos, obj->oo_db,
-                                        name, oh->ot_tx);
+        rc = udmu_del_xattr(obj->oo_db, name, oh->ot_tx);
         RETURN(rc);
 }
 
@@ -1992,30 +1998,30 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
 }
 
 static struct dt_object_operations osd_obj_ops = {
-        .do_read_lock        = osd_object_read_lock,
-        .do_write_lock       = osd_object_write_lock,
-        .do_read_unlock      = osd_object_read_unlock,
-        .do_write_unlock     = osd_object_write_unlock,
-        .do_attr_get         = osd_attr_get,
-        .do_declare_attr_set = osd_declare_attr_set,
-        .do_attr_set         = osd_attr_set,
-        .do_declare_punch    = osd_declare_punch,
-        .do_punch            = osd_punch,
-        .do_ah_init          = osd_ah_init,
-        .do_index_try        = osd_index_try,
-        .do_declare_create   = osd_declare_object_create,
-        .do_create           = osd_object_create,
-        .do_declare_ref_add  = osd_declare_object_ref_add,
-        .do_ref_add          = osd_object_ref_add,
-        .do_declare_ref_del  = osd_declare_object_ref_del,
-        .do_ref_del          = osd_object_ref_del,
-        .do_xattr_get        = osd_xattr_get,
+        .do_read_lock         = osd_object_read_lock,
+        .do_write_lock        = osd_object_write_lock,
+        .do_read_unlock       = osd_object_read_unlock,
+        .do_write_unlock      = osd_object_write_unlock,
+        .do_attr_get          = osd_attr_get,
+        .do_declare_attr_set  = osd_declare_attr_set,
+        .do_attr_set          = osd_attr_set,
+        .do_declare_punch     = osd_declare_punch,
+        .do_punch             = osd_punch,
+        .do_ah_init           = osd_ah_init,
+        .do_index_try         = osd_index_try,
+        .do_declare_create    = osd_declare_object_create,
+        .do_create            = osd_object_create,
+        .do_declare_ref_add   = osd_declare_object_ref_add,
+        .do_ref_add           = osd_object_ref_add,
+        .do_declare_ref_del   = osd_declare_object_ref_del,
+        .do_ref_del           = osd_object_ref_del,
+        .do_xattr_get         = osd_xattr_get,
         .do_declare_xattr_set = osd_declare_xattr_set,
-        .do_xattr_set        = osd_xattr_set,
+        .do_xattr_set         = osd_xattr_set,
         .do_declare_xattr_del = osd_declare_xattr_del,
-        .do_xattr_del        = osd_xattr_del,
-        .do_xattr_list       = osd_xattr_list,
-        .do_capa_get         = osd_capa_get,
+        .do_xattr_del         = osd_xattr_del,
+        .do_xattr_list        = osd_xattr_list,
+        .do_capa_get          = osd_capa_get,
 };
 
 /*
@@ -2042,7 +2048,7 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
         //loff_t offset = *pos;
         int rc;
 
-        rc = udmu_object_read((osd_sb(osd))->uos, obj->oo_db, (uint64_t)(*pos),
+        rc = udmu_object_read(osd->od_objset, obj->oo_db, (uint64_t)(*pos),
                               (uint64_t)buf->lb_len, buf->lb_buf);
         if (rc > 0)
                 *pos += rc;//buf->lb_len;
@@ -2051,7 +2057,8 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_declare_write(const struct lu_env *env, struct dt_object *dt,
-                             loff_t pos, int size, struct thandle *th)
+                             const loff_t size, loff_t pos, struct thandle *th,
+                             struct lustre_capa *capa)
 {
         struct osd_object *obj  = osd_dt_obj(dt);
         struct osd_thandle *oh;
@@ -2072,7 +2079,7 @@ static int osd_declare_write(const struct lu_env *env, struct dt_object *dt,
 
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, loff_t *pos,
-                         struct thandle *th, struct lustre_capa *capa)
+                         struct thandle *th, int ignore_quota)
 {
         struct osd_object *obj  = osd_dt_obj(dt);
         struct osd_device *osd = osd_obj2dev(obj);
@@ -2087,11 +2094,11 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
 
         udmu_object_getattr(obj->oo_db, &va);
 
-        udmu_object_write((osd_sb(osd))->uos, obj->oo_db, oh->ot_tx, offset,
+        udmu_object_write(osd->od_objset, obj->oo_db, oh->ot_tx, offset,
                           (uint64_t)buf->lb_len, buf->lb_buf);
         if (va.va_size < offset + buf->lb_len) {
                 va.va_size = offset + buf->lb_len;
-                va.va_mask = AT_SIZE;
+                va.va_mask = DMU_AT_SIZE;
                 udmu_object_setattr(obj->oo_db, oh->ot_tx, &va);
         }
         *pos += buf->lb_len;
@@ -2101,23 +2108,40 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osd_get_bufs(const struct lu_env *env, struct dt_object *dt,
-                        loff_t offset, ssize_t len, struct niobuf_local *lb)
+                        loff_t offset, ssize_t len, struct niobuf_local *_lb)
 {
-        long blocksize;
-        unsigned long tmp;
-        cfs_page_t *page;
+        struct niobuf_local *lb = _lb;
+        //long blocksize;
+        //unsigned long tmp;
+        int i, plen, npages = 0;
+
+        while (len > 0) {
+                plen = len;
+                if (plen > CFS_PAGE_SIZE)
+                        plen = CFS_PAGE_SIZE;
 
-        OBD_ALLOC_PTR(page);
-        LASSERT(page != NULL);
+                lb->file_offset = offset;
+                lb->page_offset = 0;
+                lb->len = plen;
+                lb->page = NULL;
+                lb->rc = 0;
+                lb->lnb_grant_used = 0;
+                lb->obj = dt;
+
+                offset += plen;
+                len -= plen;
+                lb++;
+                npages++;
+        }
 
-        OBD_ALLOC(page->addr, len);
-        LASSERT(page->addr != NULL);
+        for (i = 0, lb = _lb; i< npages; i++, lb++) {
+                lb->page = alloc_page(GFP_NOFS | __GFP_HIGHMEM);
+                if (lb->page == NULL)
+                        goto out_err;
+        }
 
-        lb->file_offset = offset;
-        lb->page_offset = 0;
-        lb->len = len;
-        lb->page = page;
 
+#if 0
         /* calcs for grants */
         udmu_get_blocksize(osd_dt_obj(dt)->oo_db, &blocksize);
         LASSERT(blocksize > 0);
@@ -2129,22 +2153,33 @@ static int osd_get_bufs(const struct lu_env *env, struct dt_object *dt,
         /* add overhead */
         udmu_indblk_overhead(osd_dt_obj(dt)->oo_db, &lb->bytes, &tmp);
         lb->bytes += tmp;
+#endif
 
         lu_object_get(&dt->do_lu);
         lb->obj = dt;
 
         return 1;
+out_err:
+        lb = _lb;
+        while (--i >= 0) {
+                LASSERT(lb->page);
+                __free_page(lb->page);
+                lb->page = NULL;
+        }
+        return -ENOMEM;
 }
 
 static int osd_put_bufs(const struct lu_env *env, struct dt_object *dt,
-                        struct niobuf_local *lb, int nr)
+                        struct niobuf_local *lb, int npages)
 {
         int i;
 
-        for (i = 0; i < nr; i++, lb++) {
+        for (i = 0; i < npages; i++, lb++) {
                 LASSERT(lb->obj == dt);
-                OBD_FREE(lb->page->addr, lb->len);
-                OBD_FREE_PTR(lb->page);
+                if (lb->page == NULL)
+                        continue;
+                __free_page(lb->page);
+                lb->page = NULL;
         }
         lu_object_put(env, &dt->do_lu);
 
@@ -2203,8 +2238,9 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
                 CDEBUG(D_OTHER, "write %u bytes at %u\n", (unsigned) lb->len,
                        (unsigned) lb->file_offset);
 
-                udmu_object_write((osd_sb(osd))->uos, obj->oo_db, oh->ot_tx,
-                                  lb->file_offset, lb->len, lb->page->addr);
+                udmu_object_write(osd->od_objset, obj->oo_db, oh->ot_tx,
+                                  lb->file_offset, lb->len,kmap(lb->page));
+                kunmap(lb->page);
                 if (new_size < lb->file_offset + lb->len)
                         new_size = lb->file_offset + lb->len;
 
@@ -2214,7 +2250,7 @@ static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
         udmu_object_getattr(obj->oo_db, &va);
         if (va.va_size < new_size) {
                 va.va_size = new_size;
-                va.va_mask = AT_SIZE;
+                va.va_mask = DMU_AT_SIZE;
                 udmu_object_setattr(obj->oo_db, oh->ot_tx, &va);
         }
 
@@ -2229,13 +2265,14 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
         int i;
 
         for (i = 0; i < nr; i++, lb++) {
-                buf.lb_buf = lb->page->addr;
+                buf.lb_buf = kmap(lb->page);
                 buf.lb_len = lb->len;
                 offset = lb->file_offset;
 
                 CDEBUG(D_OTHER, "read %u bytes at %u\n", (unsigned) lb->len,
                        (unsigned) lb->file_offset);
                 lb->rc = osd_read(env, dt, &buf, &offset, NULL);
+                kunmap(lb->page);
 
                 if (lb->rc < buf.lb_len) {
                         /* all subsequent rc should be 0 */
@@ -2250,6 +2287,7 @@ static int osd_read_prep(const struct lu_env *env, struct dt_object *dt,
         return 0;
 }
 
+#if 0
 static int osd_get_blocksize(const struct lu_env *env, struct dt_object *dt,
                              long *blksz)
 {
@@ -2258,6 +2296,7 @@ static int osd_get_blocksize(const struct lu_env *env, struct dt_object *dt,
         rc = udmu_get_blocksize(osd_obj->oo_db, blksz);
         return rc;
 }
+#endif
 
 static struct dt_body_operations osd_body_ops = {
         .dbo_read          = osd_read,
@@ -2269,7 +2308,7 @@ static struct dt_body_operations osd_body_ops = {
         .dbo_declare_write_commit = osd_declare_write_commit,
         .dbo_write_commit  = osd_write_commit,
         .dbo_read_prep     = osd_read_prep,
-        .dbo_get_blocksize = osd_get_blocksize
+        //.dbo_get_blocksize = osd_get_blocksize
 };
 
 /*
@@ -2353,7 +2392,7 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
 static int osd_mount(const struct lu_env *env,
                      struct osd_device *o, struct lustre_cfg *cfg)
 {
-        struct lustre_mount_info *lmi;
+        //struct lustre_mount_info *lmi;
         const char               *dev  = lustre_cfg_string(cfg, 0);
         dmu_buf_t                *rootdb;
         dmu_buf_t                *objdb;
@@ -2363,12 +2402,13 @@ static int osd_mount(const struct lu_env *env,
 
         ENTRY;
 
-        if (o->od_mount != NULL) {
+        if (o->od_objset != NULL) {
                 CERROR("Already mounted (%s) (dev %p, lu %p)\n", dev, o,
                         osd2lu_dev(o));
                 RETURN(-EEXIST);
         }
 
+#if 0
         /* get mount */
         lmi = server_get_mount(dev);
         if (lmi == NULL) {
@@ -2379,19 +2419,20 @@ static int osd_mount(const struct lu_env *env,
         LASSERT(lmi != NULL);
         /* save lustre_mount_info in dt_device */
         o->od_mount = lmi;
+#endif
 
-
-        rc = udmu_objset_root((osd_sb(o))->uos, &rootdb, root_tag);
+        rc = udmu_objset_root(o->od_objset, &rootdb, root_tag);
         if (rc) {
                 CERROR("udmu_objset_root() failed with error %d\n", rc);
                 return (-rc);
         }
         rootid = udmu_object_get_id(rootdb);
 
-        rc = udmu_zap_lookup(osd_sb(o)->uos, rootdb, "OBJ", &objid,
+        rc = udmu_zap_lookup(o->od_objset, rootdb, "OBJ", &objid,
                              sizeof(uint64_t), sizeof(uint64_t));
         if (rc == 0) {
-                rc = udmu_object_get_dmu_buf(osd_sb(o)->uos, objid, &objdb, objdir_tag);
+                rc = udmu_object_get_dmu_buf(o->od_objset, objid,
+                                             &objdb, objdir_tag);
         } else {
                 CERROR("Cannot find OBJ directory (%d)\n", rc);
                 return (-rc);
@@ -2410,10 +2451,12 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
 
         osd_sync(env, lu2dt_dev(d));
 
+#if 0
         if (osd_dev(d)->od_mount)
                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
                                  osd_dev(d)->od_mount->lmi_mnt);
         osd_dev(d)->od_mount = NULL;
+#endif
 
         lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
         RETURN(NULL);
@@ -2510,20 +2553,20 @@ static int osd_fid_lookup(const struct lu_env *env,
 
                 /* special fid found via ->index_lookup */
                 CDEBUG(D_OTHER, "lookup special %llu:%lu\n",
-                       fid->f_seq, fid->f_oid);
+                       fid->f_seq, (unsigned long) fid->f_oid);
 
                 oid = fid->f_oid;
         } else {
                 osd_fid2str(buf, fid);
 
-                rc = udmu_zap_lookup((osd_sb(dev))->uos, dev->od_objdir_db,
+                rc = udmu_zap_lookup(dev->od_objset, dev->od_objdir_db,
                                      buf, &oid, sizeof(uint64_t),
                                      sizeof(uint64_t));
                 if (rc)
                         RETURN(-rc);
         }
 
-        rc = udmu_object_get_dmu_buf((osd_sb(dev))->uos, oid, &obj->oo_db,
+        rc = udmu_object_get_dmu_buf(dev->od_objset, oid, &obj->oo_db,
                                      osd_object_tag);
         if (rc == 0) {
                 LASSERT(obj->oo_db != NULL);
@@ -2579,11 +2622,6 @@ static struct lu_device *osd2lu_dev(struct osd_device *osd)
         return &osd->od_dt_dev.dd_lu_dev;
 }
 
-static struct super_block *osd_sb(const struct osd_device *dev)
-{
-        return dev->od_mount->lmi_mnt->mnt_sb;
-}
-
 static int osd_object_invariant(const struct lu_object *l)
 {
         return osd_invariant(osd_obj(l));
index 2916064..0af614c 100644 (file)
@@ -44,7 +44,6 @@ struct osd_thread_info {
         /*
          * XXX temporary: for ->i_op calls.
          */
-        struct txn_param       oti_txn;
         struct timespec        oti_time;
         /*
          * XXX temporary: for capa operations.
diff --git a/lustre/dmu-osd/udmu.c b/lustre/dmu-osd/udmu.c
new file mode 100644 (file)
index 0000000..c54a734
--- /dev/null
@@ -0,0 +1,959 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/dmu/udmu.c
+ *  Module that interacts with the ZFS DMU and provides an abstraction
+ *  to the rest of Lustre.
+ *
+ *  Copyright (c) 2007 Cluster File Systems, Inc.
+ *   Author: Alex Tomas <alex@clusterfs.com>
+ *   Author: Atul Vidwansa <atul.vidwansa@sun.com>
+ *   Author: Manoj Joseph <manoj.joseph@sun.com>
+ *   Author: Mike Pershin <tappro@sun.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#include <sys/dnode.h>
+#include <sys/dbuf.h>
+#include <sys/spa.h>
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/zap.h>
+#include <sys/spa_impl.h>
+#include <sys/zfs_znode.h>
+#include <sys/dmu_tx.h>
+#include <sys/dmu_objset.h>
+#include <udmu.h>
+#include <sys/dbuf.h>
+#include <sys/dnode.h>
+#include <sys/dmu_ctl.h>
+
+enum vtype iftovt_tab[] = {
+        VNON, VFIFO, VCHR, VNON, VDIR, VNON, VBLK, VNON,
+        VREG, VNON, VLNK, VNON, VSOCK, VNON, VNON, VNON
+};
+
+ushort_t vttoif_tab[] = {
+        0, S_IFREG, S_IFDIR, S_IFBLK, S_IFCHR, S_IFLNK, S_IFIFO,
+        S_IFDOOR, 0, S_IFSOCK, S_IFPORT, 0
+};
+
+#define MODEMASK        07777
+
+#define IFTOVT(M)       (iftovt_tab[((M) & S_IFMT) >> 12])
+#define VTTOIF(T)       (vttoif_tab[(int)(T)])
+#define MAKEIMODE(T, M) (VTTOIF(T) | ((M) & ~S_IFMT))
+
+/*
+ * Debug levels. Default is LEVEL_CRITICAL.
+ */
+#define LEVEL_CRITICAL  1
+#define LEVEL_INFO      2
+#define LEVEL_DEBUG     3
+
+static int debug_level = LEVEL_CRITICAL;
+
+#define CONFIG_DIR "/var/run/zfs/udmu"
+static char config_path[MAXPATHLEN];
+
+static void udmu_gethrestime(struct timespec *tp)
+{
+        tp->tv_nsec = 0;
+        time(&tp->tv_sec);
+}
+
+static void udmu_printf(int level, FILE *stream, char *message, ...)
+{
+        va_list args;
+
+        if (level <= debug_level) {
+                va_start(args, message);
+                (void) vfprintf(stream, message, args);
+                va_end(args);
+        }
+}
+
+void udmu_debug(int level)
+{
+        debug_level = level;
+}
+
+void udmu_init()
+{
+        char tmp[MAXPATHLEN];
+        struct rlimit rl = { 1024, 1024 };
+        int rc;
+
+        /*
+         * Set spa_config_path to /var/run/zfs/udmu/$pid/zpool.cache.
+         */
+        snprintf(config_path, MAXPATHLEN, "%s/%d", CONFIG_DIR, (int)getpid());
+
+        snprintf(tmp, MAXPATHLEN, "mkdir -p %s", config_path);
+        system(tmp);
+
+        /* Never hurts to be careful */
+        strncpy(tmp, config_path, MAXPATHLEN - 1);
+        tmp[MAXPATHLEN - 1] = '\0';
+
+        snprintf(config_path, MAXPATHLEN, "%s/zpool.cache", tmp);
+        spa_config_path = config_path;
+
+        (void) setvbuf(stdout, NULL, _IOLBF, 0);
+        (void) setrlimit(RLIMIT_NOFILE, &rl);
+
+        /* Initialize the emulation of kernel services in userland. */
+        kernel_init(FREAD | FWRITE);
+
+        rc = dctl_server_init(tmp, 2, 2);
+        if (rc != 0)
+                fprintf(stderr, "Error calling dctl_server_init(): %i\n"
+                    "lzpool and lzfs will not be functional!\n", rc);
+}
+
+void udmu_fini()
+{
+        int rc;
+
+        rc = dctl_server_fini();
+        if (rc != 0)
+                fprintf(stderr, "Error calling dctl_server_fini(): %i!\n", rc);
+
+        kernel_fini();
+}
+
+int udmu_objset_open(char *osname, char *import_dir, int import, int force,
+                     udmu_objset_t *uos)
+{
+        int error;
+        char cmd[MAXPATHLEN];
+        char *c;
+        uint64_t version = ZPL_VERSION;
+        int tried_import = FALSE;
+
+        memset(uos, 0, sizeof(udmu_objset_t));
+
+        c = strchr(osname, '/');
+
+top:
+        /* Let's try to open the objset */
+        error = dmu_objset_open(osname, DMU_OST_ZFS, DS_MODE_OWNER, &uos->os);
+
+        if (error == ENOENT && import && !tried_import) {
+                /* objset not found, let's try to import the pool */
+                udmu_printf(LEVEL_INFO, stdout, "Importing pool %s\n", osname);
+
+                if (c != NULL)
+                        *c = '\0';
+
+                snprintf(cmd, sizeof(cmd), "lzpool import%s%s%s %s",
+                    force ? " -F" : "", import_dir ? " -d " : "",
+                    import_dir ? import_dir : "", osname);
+
+                if (c != NULL)
+                        *c = '/';
+
+                error = system(cmd);
+
+                if (error) {
+                        udmu_printf(LEVEL_CRITICAL, stderr, "\"%s\" failed:"
+                            " %d\n", cmd, error);
+                        return(error);
+                }
+
+                tried_import = TRUE;
+                goto top;
+        }
+
+        if (error) {
+                uos->os = NULL;
+                goto out;
+        }
+
+        /* Check ZFS version */
+        error = zap_lookup(uos->os, MASTER_NODE_OBJ, ZPL_VERSION_STR, 8, 1,
+                           &version);
+        if (error) {
+                udmu_printf(LEVEL_CRITICAL, stderr,
+                            "Error looking up ZPL VERSION");
+                /*
+                 * We can't return ENOENT because that would mean the objset
+                 * didn't exist.
+                 */
+                error = EIO;
+                goto out;
+        } else if (version != LUSTRE_ZPL_VERSION) {
+                udmu_printf(LEVEL_CRITICAL, stderr,
+                            "Mismatched versions:  File system "
+                            "is version %lld on-disk format, which is "
+                            "incompatible with this software version %lld!",
+                            (u_longlong_t)version, LUSTRE_ZPL_VERSION);
+                error = ENOTSUP;
+                goto out;
+        }
+
+        error = zap_lookup(uos->os, MASTER_NODE_OBJ, ZFS_ROOT_OBJ,
+                           8, 1, &uos->root);
+        if (error) {
+                udmu_printf(LEVEL_CRITICAL, stderr,
+                            "Error looking up ZFS root object.");
+                error = EIO;
+                goto out;
+        }
+        ASSERT(uos->root != 0);
+
+out:
+        if (error) {
+                if (uos->os == NULL && tried_import) {
+                        if (c != NULL)
+                                *c = '\0';
+                        spa_export(osname, NULL, B_TRUE);
+                        if (c != NULL)
+                                *c = '/';
+                } else if(uos->os != NULL)
+                        udmu_objset_close(uos, tried_import);
+        }
+
+        return (error);
+}
+
+void udmu_wait_synced(udmu_objset_t *uos, dmu_tx_t *tx)
+{
+        /* Wait for the pool to be synced */
+        txg_wait_synced(dmu_objset_pool(uos->os),
+                        tx ? tx->tx_txg : 0ULL);
+}
+
+void udmu_objset_close(udmu_objset_t *uos, int export_pool)
+{
+        spa_t *spa;
+        char pool_name[MAXPATHLEN];
+
+        ASSERT(uos->os != NULL);
+        spa = uos->os->os->os_spa;
+
+        spa_config_enter(spa, RW_READER, FTAG);
+        strncpy(pool_name, spa_name(spa), sizeof(pool_name));
+        spa_config_exit(spa, FTAG);
+
+        udmu_wait_synced(uos, NULL);
+        /* close the object set */
+        dmu_objset_close(uos->os);
+
+        uos->os = NULL;
+
+        if (export_pool)
+                spa_export(pool_name, NULL, B_TRUE);
+}
+
+int udmu_objset_statvfs(udmu_objset_t *uos, struct statvfs64 *statp)
+{
+        uint64_t refdbytes, availbytes, usedobjs, availobjs;
+
+        dmu_objset_space(uos->os, &refdbytes, &availbytes, &usedobjs,
+                         &availobjs);
+
+        /*
+         * The underlying storage pool actually uses multiple block sizes.
+         * We report the fragsize as the smallest block size we support,
+         * and we report our blocksize as the filesystem's maximum blocksize.
+         */
+        statp->f_frsize = 1ULL << SPA_MINBLOCKSHIFT;
+        statp->f_bsize = 1ULL << SPA_MAXBLOCKSHIFT;
+
+        /*
+         * The following report "total" blocks of various kinds in the
+         * file system, but reported in terms of f_frsize - the
+         * "fragment" size.
+         */
+
+        statp->f_blocks = (refdbytes + availbytes) >> SPA_MINBLOCKSHIFT;
+        statp->f_bfree = availbytes >> SPA_MINBLOCKSHIFT;
+        statp->f_bavail = statp->f_bfree; /* no root reservation */
+
+        /*
+         * statvfs() should really be called statufs(), because it assumes
+         * static metadata.  ZFS doesn't preallocate files, so the best
+         * we can do is report the max that could possibly fit in f_files,
+         * and that minus the number actually used in f_ffree.
+         * For f_ffree, report the smaller of the number of object available
+         * and the number of blocks (each object will take at least a block).
+         */
+        statp->f_ffree = MIN(availobjs, statp->f_bfree);
+        statp->f_favail = statp->f_ffree; /* no "root reservation" */
+        statp->f_files = statp->f_ffree + usedobjs;
+
+        /* ZFSFUSE: not necessary? see 'man statfs' */
+        /*(void) cmpldev(&d32, vfsp->vfs_dev);
+        statp->f_fsid = d32;*/
+
+        /*
+         * We're a zfs filesystem.
+         */
+        /* ZFSFUSE: not necessary */
+        /*(void) strcpy(statp->f_basetype, vfssw[vfsp->vfs_fstype].vsw_name);
+
+        statp->f_flag = vf_to_stf(vfsp->vfs_flag);*/
+
+        statp->f_namemax = 256;
+
+        return (0);
+}
+
+static int udmu_obj2dbuf(udmu_objset_t *uos, uint64_t oid, dmu_buf_t **dbp,
+                         void *tag)
+{
+        dmu_object_info_t doi;
+        int err;
+
+        ASSERT(tag);
+
+        err = dmu_bonus_hold(uos->os, oid, tag, dbp);
+        if (err) {
+                return (err);
+        }
+
+        dmu_object_info_from_db(*dbp, &doi);
+        if (doi.doi_bonus_type != DMU_OT_ZNODE ||
+            doi.doi_bonus_size < sizeof (znode_phys_t)) {
+                dmu_buf_rele(*dbp, tag);
+                return (EINVAL);
+        }
+
+        ASSERT(*dbp);
+        ASSERT((*dbp)->db_object == oid);
+        ASSERT((*dbp)->db_offset == -1);
+        ASSERT((*dbp)->db_data != NULL);
+
+        return (0);
+}
+
+int udmu_objset_root(udmu_objset_t *uos, dmu_buf_t **dbp, void *tag)
+{
+        return (udmu_obj2dbuf(uos, uos->root, dbp, tag));
+}
+
+int udmu_zap_lookup(udmu_objset_t *uos, dmu_buf_t *zap_db, const char *name,
+                    void *value, int value_size, int intsize)
+{
+        uint64_t oid;
+        oid = zap_db->db_object;
+
+        /*
+         * value_size should be a multiple of intsize.
+         * intsize is 8 for micro ZAP and 1, 2, 4 or 8 for a fat ZAP.
+         */
+        ASSERT(value_size % intsize == 0);
+        return (zap_lookup(uos->os, oid, name, intsize,
+                           value_size / intsize, value));
+}
+
+/*
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_bonus(tx, DMU_NEW_OBJECT) called and then assigned
+ * to a transaction group.
+ */
+void udmu_object_create(udmu_objset_t *uos, dmu_buf_t **dbp, dmu_tx_t *tx,
+                        void *tag)
+{
+        znode_phys_t    *zp;
+        uint64_t        oid;
+        uint64_t        gen;
+        timestruc_t     now;
+
+        ASSERT(tag);
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        udmu_gethrestime(&now);
+        gen = dmu_tx_get_txg(tx);
+
+        /* Create a new DMU object. */
+        oid = dmu_object_alloc(uos->os, DMU_OT_PLAIN_FILE_CONTENTS, 0,
+                               DMU_OT_ZNODE, sizeof (znode_phys_t), tx);
+
+        dmu_object_set_blocksize(uos->os, oid, 128ULL << 10, 0, tx);
+
+        VERIFY(0 == dmu_bonus_hold(uos->os, oid, tag, dbp));
+
+        dmu_buf_will_dirty(*dbp, tx);
+
+        /* Initialize the znode physical data to zero. */
+        ASSERT((*dbp)->db_size >= sizeof (znode_phys_t));
+        bzero((*dbp)->db_data, (*dbp)->db_size);
+        zp = (*dbp)->db_data;
+        zp->zp_gen = gen;
+        zp->zp_links = 1;
+        ZFS_TIME_ENCODE(&now, zp->zp_crtime);
+        ZFS_TIME_ENCODE(&now, zp->zp_ctime);
+        ZFS_TIME_ENCODE(&now, zp->zp_atime);
+        ZFS_TIME_ENCODE(&now, zp->zp_mtime);
+        zp->zp_mode = MAKEIMODE(VREG, 0007);
+}
+
+
+/*
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_zap(tx, DMU_NEW_OBJECT, ...) called and then assigned
+ * to a transaction group.
+ */
+void udmu_zap_create(udmu_objset_t *uos, dmu_buf_t **zap_dbp, dmu_tx_t *tx,
+                     void *tag)
+{
+        znode_phys_t    *zp;
+        uint64_t        oid;
+        timestruc_t     now;
+        uint64_t        gen;
+
+        ASSERT(tag);
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        oid = 0;
+        udmu_gethrestime(&now);
+        gen = dmu_tx_get_txg(tx);
+
+        oid = zap_create(uos->os, DMU_OT_DIRECTORY_CONTENTS, DMU_OT_ZNODE,
+                         sizeof (znode_phys_t), tx);
+
+        VERIFY(0 == dmu_bonus_hold(uos->os, oid, tag, zap_dbp));
+
+        dmu_buf_will_dirty(*zap_dbp, tx);
+
+        bzero((*zap_dbp)->db_data, (*zap_dbp)->db_size);
+        zp = (*zap_dbp)->db_data;
+        zp->zp_size = 2;
+        zp->zp_links = 1;
+        zp->zp_gen = gen;
+        zp->zp_mode = MAKEIMODE(VDIR, 0007);
+
+        ZFS_TIME_ENCODE(&now, zp->zp_crtime);
+        ZFS_TIME_ENCODE(&now, zp->zp_ctime);
+        ZFS_TIME_ENCODE(&now, zp->zp_atime);
+        ZFS_TIME_ENCODE(&now, zp->zp_mtime);
+}
+
+int udmu_object_get_dmu_buf(udmu_objset_t *uos, uint64_t object,
+                            dmu_buf_t **dbp, void *tag)
+{
+        return (udmu_obj2dbuf(uos, object, dbp, tag));
+}
+
+
+/*
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_bonus(tx, oid) and
+ * udmu_tx_hold_zap(tx, oid, ...)
+ * called and then assigned to a transaction group.
+ */
+int udmu_zap_insert(udmu_objset_t *uos, dmu_buf_t *zap_db, dmu_tx_t *tx,
+                    const char *name, void *value, int len)
+{
+        uint64_t oid = zap_db->db_object;
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        dmu_buf_will_dirty(zap_db, tx);
+        return (zap_add(uos->os, oid, name, 8, 1, value, tx));
+}
+
+/*
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_zap(tx, oid, ...) called and then
+ * assigned to a transaction group.
+ */
+int udmu_zap_delete(udmu_objset_t *uos, dmu_buf_t *zap_db, dmu_tx_t *tx,
+                    const char *name)
+{
+        uint64_t oid = zap_db->db_object;
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        return (zap_remove(uos->os, oid, name, tx));
+}
+
+/*
+ * Zap cursor APIs
+ * */
+
+int udmu_zap_cursor_init(zap_cursor_t **zc, udmu_objset_t *uos, uint64_t zapobj)
+{
+        zap_cursor_t * t;
+
+        t = kmem_alloc(sizeof(*t), KM_NOSLEEP);
+        if (t) {
+                zap_cursor_init(t, uos->os, zapobj);
+                *zc = t;
+                return 0;
+        }
+        return (ENOMEM);
+}
+
+void udmu_zap_cursor_fini(zap_cursor_t *zc)
+{
+        zap_cursor_fini(zc);
+        kmem_free(zc, sizeof(*zc));
+}
+
+int udmu_zap_cursor_retrieve_key(zap_cursor_t *zc, char *key)
+{
+        int err;
+        zap_attribute_t za;
+
+        if (err = zap_cursor_retrieve(zc, &za))
+                return err;
+
+        if (key)
+                strncpy(key, za.za_name, MAXNAMELEN);
+
+        return 0;
+}
+
+/*
+ * zap_cursor_retrieve read from current record.
+ * to read bytes we need to call zap_lookup explicitly.
+ */
+
+int udmu_zap_cursor_retrieve_value(zap_cursor_t *zc,  char *buf,
+                int buf_size, int *bytes_read)
+{
+        int err, actual_size;
+        zap_attribute_t za;
+
+
+        if (err = zap_cursor_retrieve(zc, &za))
+                return err;
+
+        if (za.za_integer_length <= 0)
+                return (ERANGE);
+
+        actual_size = za.za_integer_length * za.za_num_integers;
+
+        if (actual_size > buf_size) {
+                actual_size = buf_size;
+                buf_size = actual_size / za.za_integer_length;
+        } else {
+                buf_size = za.za_num_integers;
+        }
+
+        err = zap_lookup(zc->zc_objset, zc->zc_zapobj,
+                        za.za_name, za.za_integer_length, buf_size, buf);
+
+        if (!err)
+                *bytes_read = actual_size;
+
+        return err;
+}
+
+void udmu_zap_cursor_advance(zap_cursor_t *zc)
+{
+        zap_cursor_advance(zc);
+}
+
+uint64_t udmu_zap_cursor_serialize(zap_cursor_t *zc)
+{
+        return zap_cursor_serialize(zc);
+}
+
+int udmu_zap_cursor_move_to_key(zap_cursor_t *zc, const char *name)
+{
+        return zap_cursor_move_to_key(zc, name, MT_EXACT);
+}
+
+void udmu_zap_cursor_init_serialized(zap_cursor_t *zc, objset_t *ds,
+                            uint64_t zapobj, uint64_t serialized)
+{
+        zap_cursor_init_serialized(zc, ds, zapobj, serialized);
+}
+
+
+/*
+ * Read data from a DMU object
+ */
+int udmu_object_read(udmu_objset_t *uos, dmu_buf_t *db, uint64_t offset,
+                     uint64_t size, void *buf)
+{
+        uint64_t oid = db->db_object;
+        vnattr_t va;
+        int rc;
+
+        udmu_printf(LEVEL_INFO, stdout, "udmu_read(%lld, %lld, %lld)\n",
+                    oid, offset, size);
+
+        udmu_object_getattr(db, &va);
+        if (offset + size > va.va_size) {
+                if (va.va_size < offset)
+                        size = 0;
+                else
+                        size = va.va_size - offset;
+        }
+
+        rc = dmu_read(uos->os, oid, offset, size, buf);
+        if (rc == 0)
+                return size;
+        else 
+                return (-rc);
+}
+
+/*
+ * Write data to a DMU object
+ *
+ * The transaction passed to this routine must have had
+ * udmu_tx_hold_write(tx, oid, offset, size) called and then
+ * assigned to a transaction group.
+ */
+void udmu_object_write(udmu_objset_t *uos, dmu_buf_t *db, struct dmu_tx *tx,
+                       uint64_t offset, uint64_t size, void *buf)
+{
+        uint64_t oid = db->db_object;
+
+        udmu_printf(LEVEL_INFO, stdout, "udmu_write(%lld, %lld, %lld\n",
+                    oid, offset, size);
+
+        dmu_write(uos->os, oid, offset, size, buf, tx);
+}
+
+/*
+ * Retrieve the attributes of a DMU object
+ */
+void udmu_object_getattr(dmu_buf_t *db, vnattr_t *vap)
+{
+        dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
+        znode_phys_t *zp = db->db_data;
+
+        vap->va_mask = DMU_AT_ATIME | DMU_AT_MTIME | DMU_AT_CTIME | DMU_AT_MODE | DMU_AT_SIZE |
+                       DMU_AT_UID | DMU_AT_GID | DMU_AT_TYPE | DMU_AT_NLINK | DMU_AT_RDEV;
+        vap->va_atime.tv_sec    = zp->zp_atime[0];
+        vap->va_atime.tv_nsec   = 0;
+        vap->va_mtime.tv_sec    = zp->zp_mtime[0];
+        vap->va_mtime.tv_nsec   = 0;
+        vap->va_ctime.tv_sec    = zp->zp_ctime[0];
+        vap->va_ctime.tv_nsec   = 0;
+        vap->va_mode     = zp->zp_mode & MODEMASK;;
+        vap->va_size     = zp->zp_size;
+        vap->va_uid      = zp->zp_uid;
+        vap->va_gid      = zp->zp_gid;
+        vap->va_type     = IFTOVT((mode_t)zp->zp_mode);
+        vap->va_nlink    = zp->zp_links;
+        vap->va_rdev     = zp->zp_rdev;
+
+        vap->va_blksize = dn->dn_datablksz;
+        vap->va_blkbits = dn->dn_datablkshift;
+        /* in 512-bytes units*/
+        vap->va_nblocks = DN_USED_BYTES(dn->dn_phys) >> SPA_MINBLOCKSHIFT;
+        vap->va_mask |= DMU_AT_NBLOCKS | DMU_AT_BLKSIZE;
+}
+
+/*
+ * Set the attributes of an object
+ *
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_bonus(tx, oid) called and then assigned
+ * to a transaction group.
+ */
+void udmu_object_setattr(dmu_buf_t *db, dmu_tx_t *tx, vnattr_t *vap)
+{
+        znode_phys_t *zp = db->db_data;
+        uint_t mask = vap->va_mask;
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        if (mask == 0) {
+                return;
+        }
+
+        dmu_buf_will_dirty(db, tx);
+
+        /*
+         * Set each attribute requested.
+         * We group settings according to the locks they need to acquire.
+         *
+         * Note: you cannot set ctime directly, although it will be
+         * updated as a side-effect of calling this function.
+         */
+
+        if (mask & DMU_AT_MODE)
+                zp->zp_mode = MAKEIMODE(vap->va_type, vap->va_mode);
+
+        if (mask & DMU_AT_UID)
+                zp->zp_uid = (uint64_t)vap->va_uid;
+
+        if (mask & DMU_AT_GID)
+                zp->zp_gid = (uint64_t)vap->va_gid;
+
+        if (mask & DMU_AT_SIZE)
+                zp->zp_size = vap->va_size;
+
+        if (mask & DMU_AT_ATIME)
+                ZFS_TIME_ENCODE(&vap->va_atime, zp->zp_atime);
+
+        if (mask & DMU_AT_MTIME)
+                ZFS_TIME_ENCODE(&vap->va_mtime, zp->zp_mtime);
+
+        if (mask & DMU_AT_CTIME)
+                ZFS_TIME_ENCODE(&vap->va_ctime, zp->zp_ctime);
+
+        if (mask & DMU_AT_NLINK)
+                zp->zp_links = vap->va_nlink;
+}
+
+/*
+ * Punch/truncate an object
+ *
+ *      IN:     db      - dmu_buf of the object to free data in.
+ *              off     - start of section to free.
+ *              len     - length of section to free (0 => to EOF).
+ *
+ *      RETURN: 0 if success
+ *              error code if failure
+ *
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_bonus(tx, oid) and
+ * if off < size, udmu_tx_hold_free(tx, oid, off, len ? len : DMU_OBJECT_END)
+ * called and then assigned to a transaction group.
+ */
+void udmu_object_punch(udmu_objset_t *uos, dmu_buf_t *db, dmu_tx_t *tx,
+                      uint64_t off, uint64_t len)
+{
+        znode_phys_t *zp = db->db_data;
+        uint64_t oid = db->db_object;
+        uint64_t end = off + len;
+        uint64_t size = zp->zp_size;
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        /*
+         * Nothing to do if file already at desired length.
+         */
+        if (len == 0 && size == off) {
+                return;
+        }
+
+        if (end > size || len == 0) {
+                zp->zp_size = end;
+        }
+
+        if (off < size) {
+                uint64_t rlen = len;
+
+                if (len == 0)
+                        rlen = -1;
+                else if (end > size)
+                        rlen = size - off;
+
+                VERIFY(0 == dmu_free_range(uos->os, oid, off, rlen, tx));
+        }
+}
+
+/*
+ * Delete a DMU object
+ *
+ * The transaction passed to this routine must have
+ * udmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END) called
+ * and then assigned to a transaction group.
+ *
+ * This will release db and set it to NULL to prevent further dbuf releases.
+ */
+int udmu_object_delete(udmu_objset_t *uos, dmu_buf_t **db, dmu_tx_t *tx,
+                       void *tag)
+{
+        int error;
+        uint64_t oid = (*db)->db_object;
+
+        /* Assert that the transaction has been assigned to a
+           transaction group. */
+        ASSERT(tx->tx_txg != 0);
+
+        udmu_object_put_dmu_buf(*db, tag);
+        *db = NULL;
+
+        error = dmu_object_free(uos->os, oid, tx);
+
+        return (error);
+}
+
+/*
+ * Get the object id from dmu_buf_t
+ */
+uint64_t udmu_object_get_id(dmu_buf_t *db)
+{
+        ASSERT(db != NULL);
+        return (db->db_object);
+}
+
+int udmu_object_is_zap(dmu_buf_t *_db)
+{
+        dmu_buf_impl_t *db = (dmu_buf_impl_t *) _db;
+        if (db->db_dnode->dn_type == DMU_OT_DIRECTORY_CONTENTS)
+                return 1;
+        return 0;
+}
+
+/*
+ * Release the reference to a dmu_buf object.
+ */
+void udmu_object_put_dmu_buf(dmu_buf_t *db, void *tag)
+{
+        ASSERT(tag);
+        dmu_buf_rele(db, tag);
+}
+
+dmu_tx_t *udmu_tx_create(udmu_objset_t *uos)
+{
+        return (dmu_tx_create(uos->os));
+}
+
+void udmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
+{
+        dmu_tx_hold_write(tx, object, off, len);
+}
+
+void udmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off,
+                       uint64_t len)
+{
+        dmu_tx_hold_free(tx, object, off, len);
+}
+
+void udmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int add, char *name)
+{
+        dmu_tx_hold_zap(tx, object, add, name);
+}
+
+void udmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
+{
+        dmu_tx_hold_bonus(tx, object);
+}
+
+void udmu_tx_abort(dmu_tx_t *tx)
+{
+        dmu_tx_abort(tx);
+}
+
+int udmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
+{
+        return (dmu_tx_assign(tx, txg_how));
+}
+
+void udmu_tx_wait(dmu_tx_t *tx)
+{
+        dmu_tx_wait(tx);
+}
+
+void udmu_tx_commit(dmu_tx_t *tx)
+{
+        dmu_tx_commit(tx);
+}
+
+/* commit callback API */
+void * udmu_tx_cb_create(size_t bytes)
+{
+        return dmu_tx_callback_data_create(bytes);
+}
+
+int udmu_tx_cb_add(dmu_tx_t *tx, void *func, void *data)
+{
+        return dmu_tx_callback_commit_add(tx, func, data);
+}
+
+int udmu_tx_cb_destroy(void *data)
+{
+        return dmu_tx_callback_data_destroy(data);
+}
+
+int udmu_indblk_overhead(dmu_buf_t *db, unsigned long *used,
+                         unsigned long *overhead)
+{
+        dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
+
+        *overhead = (2 * (*used))/(1 << dn->dn_phys->dn_indblkshift);
+
+        return 0;
+}
+
+int udmu_get_blocksize(dmu_buf_t *db, long *blksz)
+{
+        dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
+
+        *blksz = (dn->dn_datablksz);
+
+        return 0;
+}
+
+int udmu_object_get_links(dmu_buf_t *db)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+        return 0;
+}
+
+void udmu_object_links_inc(dmu_buf_t *db, dmu_tx_t *tx)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+}
+
+void udmu_object_links_dec(dmu_buf_t *db, dmu_tx_t *tx)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+}
+
+int udmu_get_xattr(dmu_buf_t *db, void *val, int vallen, const char *name)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+        return 0;
+}
+
+int udmu_set_xattr(dmu_buf_t *db, void *val, int vallen, const char *name,
+                   dmu_tx_t *tx)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+        return 0;
+}
+
+int udmu_del_xattr(dmu_buf_t *db, const char *name, dmu_tx_t *tx)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+        return 0;
+}
+
+int udmu_list_xattr(dmu_buf_t *db, void *val, int vallen)
+{
+        /* XXX: not implemented yet */
+        BUG_ON(1);
+        return 0;
+}
+
+
diff --git a/lustre/dmu-osd/udmu.h b/lustre/dmu-osd/udmu.h
new file mode 100644 (file)
index 0000000..9ef88d5
--- /dev/null
@@ -0,0 +1,269 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2006 Cluster File Systems, Inc.
+ *   Author: Alex Tomas <alex@clusterfs.com>
+ *   Author: Atul Vidwansa <atul.vidwansa@sun.com>
+ *   Author: Manoj Joseph <manoj.joseph@sun.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef _DMU_H
+#define _DMU_H
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+#define LUSTRE_ZPL_VERSION 1ULL
+
+#ifndef DMU_AT_TYPE
+#define DMU_AT_TYPE    0x0001
+#define DMU_AT_MODE    0x0002
+#define DMU_AT_UID     0x0004
+#define DMU_AT_GID     0x0008
+#define DMU_AT_FSID    0x0010
+#define DMU_AT_NODEID  0x0020
+#define DMU_AT_NLINK   0x0040
+#define DMU_AT_SIZE    0x0080
+#define DMU_AT_ATIME   0x0100
+#define DMU_AT_MTIME   0x0200
+#define DMU_AT_CTIME   0x0400
+#define DMU_AT_RDEV    0x0800
+#define DMU_AT_BLKSIZE 0x1000
+#define DMU_AT_NBLOCKS 0x2000
+#define DMU_AT_SEQ     0x8000
+#endif
+
+#define ACCESSED                (DMU_AT_ATIME)
+#define STATE_CHANGED           (DMU_AT_CTIME)
+#define CONTENT_MODIFIED        (DMU_AT_MTIME | DMU_AT_CTIME)
+
+#define LOOKUP_DIR              0x01    /* want parent dir vp */
+#define LOOKUP_XATTR            0x02    /* lookup up extended attr dir */
+#define CREATE_XATTR_DIR        0x04    /* Create extended attr dir */
+
+#define S_IFDOOR        0xD000  /* door */
+#define S_IFPORT        0xE000  /* event port */
+
+struct statvfs64;
+
+/* Data structures required for Solaris ZFS compatability */
+#if !defined(__sun__)
+
+#ifndef _SOL_SYS_TIME_H
+typedef struct timespec timestruc_t;
+#endif
+
+#endif
+
+typedef enum vtype {
+        VNON    = 0,
+        VREG    = 1,
+        VDIR    = 2,
+        VBLK    = 3,
+        VCHR    = 4,
+        VLNK    = 5,
+        VFIFO   = 6,
+        VDOOR   = 7,
+        VPROC   = 8,
+        VSOCK   = 9,
+        VPORT   = 10,
+        VBAD    = 11
+} vtype_t;
+
+typedef struct vnattr {
+        unsigned int    va_mask;        /* bit-mask of attributes */
+        vtype_t         va_type;        /* vnode type (for create) */
+        mode_t          va_mode;        /* file access mode */
+        uid_t           va_uid;         /* owner user id */
+        gid_t           va_gid;         /* owner group id */
+        dev_t           va_fsid;        /* file system id (dev for now) */
+        unsigned long long va_nodeid;   /* node id */
+        nlink_t         va_nlink;       /* number of references to file */
+        off_t           va_size;        /* file size in bytes */
+        timestruc_t     va_atime;       /* time of last access */
+        timestruc_t     va_mtime;       /* time of last modification */
+        timestruc_t     va_ctime;       /* time of last status change */
+        dev_t           va_rdev;        /* device the file represents */
+        unsigned int    va_blksize;     /* fundamental block size */
+        unsigned int    va_blkbits;
+        unsigned long long va_nblocks;  /* # of blocks allocated */
+        unsigned int    va_seq;         /* sequence number */
+} vnattr_t;
+
+typedef struct udmu_objset {
+        struct objset *os;
+        struct zilog *zilog;
+        uint64_t root;  /* id of root znode */
+        uint64_t unlinkedobj;
+} udmu_objset_t;
+
+
+/* definitions from dmu.h */
+#ifndef _SYS_DMU_H
+
+typedef struct objset objset_t;
+typedef struct dmu_tx dmu_tx_t;
+typedef struct dmu_buf dmu_buf_t;
+typedef struct zap_cursor zap_cursor_t;
+
+#define DMU_NEW_OBJECT  (-1ULL)
+#define DMU_OBJECT_END  (-1ULL)
+
+#endif
+
+#ifndef _SYS_TXG_H
+#define TXG_WAIT        1ULL
+#define TXG_NOWAIT      2ULL
+#endif
+
+#define ZFS_DIRENT_MAKE(type, obj) (((uint64_t)type << 60) | obj)
+
+#define FTAG ((char *)__func__)
+
+void udmu_init(void);
+
+void udmu_fini(void);
+
+void udmu_debug(int level);
+
+/* udmu object-set API */
+
+int udmu_objset_open(char *osname, char *import_dir, int import, int force, udmu_objset_t *uos);
+
+void udmu_objset_close(udmu_objset_t *uos, int export_pool);
+
+int udmu_objset_statvfs(udmu_objset_t *uos, struct statvfs64 *statp);
+
+int udmu_objset_root(udmu_objset_t *uos, dmu_buf_t **dbp, void *tag);
+
+void udmu_wait_synced(udmu_objset_t *uos, dmu_tx_t *tx);
+
+/* udmu ZAP API */
+
+int udmu_zap_lookup(udmu_objset_t *uos, dmu_buf_t *zap_db, const char *name,
+                    void *value, int value_size, int intsize);
+
+void udmu_zap_create(udmu_objset_t *uos, dmu_buf_t **zap_dbp, dmu_tx_t *tx, void *tag);
+
+int udmu_zap_insert(udmu_objset_t *uos, dmu_buf_t *zap_db, dmu_tx_t *tx,
+                    const char *name, void *value, int len);
+
+int udmu_zap_delete(udmu_objset_t *uos, dmu_buf_t *zap_db, dmu_tx_t *tx,
+                    const char *name);
+
+/* zap cursor apis */
+int udmu_zap_cursor_init(zap_cursor_t **zc, udmu_objset_t *uos, uint64_t zapobj);
+
+void udmu_zap_cursor_fini(zap_cursor_t *zc);
+
+int udmu_zap_cursor_retrieve_key(zap_cursor_t *zc, char *key);
+
+int udmu_zap_cursor_retrieve_value(zap_cursor_t *zc,  char *buf,
+                int buf_size, int *bytes_read);
+
+void udmu_zap_cursor_advance(zap_cursor_t *zc);
+
+uint64_t udmu_zap_cursor_serialize(zap_cursor_t *zc);
+
+int udmu_zap_cursor_move_to_key(zap_cursor_t *zc, const char *name);
+
+void udmu_zap_cursor_init_serialized(zap_cursor_t *zc, udmu_objset_t *uos,
+                            uint64_t zapobj, uint64_t serialized);
+
+/* udmu object API */
+
+void udmu_object_create(udmu_objset_t *uos, dmu_buf_t **dbp, dmu_tx_t *tx, void *tag);
+
+int udmu_object_get_dmu_buf(udmu_objset_t *uos, uint64_t object,
+                            dmu_buf_t **dbp, void *tag);
+
+void udmu_object_put_dmu_buf(dmu_buf_t *db, void *tag);
+
+uint64_t udmu_object_get_id(dmu_buf_t *db);
+
+int udmu_object_read(udmu_objset_t *uos, dmu_buf_t *db, uint64_t offset,
+                     uint64_t size, void *buf);
+
+void udmu_object_write(udmu_objset_t *uos, dmu_buf_t *db, struct dmu_tx *tx,
+                      uint64_t offset, uint64_t size, void *buf);
+
+void udmu_object_getattr(dmu_buf_t *db, vnattr_t *vap);
+
+void udmu_object_setattr(dmu_buf_t *db, dmu_tx_t *tx, vnattr_t *vap);
+
+void udmu_object_punch(udmu_objset_t *uos, dmu_buf_t *db, dmu_tx_t *tx,
+                      uint64_t offset, uint64_t len);
+
+int udmu_object_delete(udmu_objset_t *uos, dmu_buf_t **db, dmu_tx_t *tx, void *tag);
+
+/*udmu transaction API */
+
+dmu_tx_t *udmu_tx_create(udmu_objset_t *uos);
+
+void udmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len);
+
+void udmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off,
+    uint64_t len);
+
+void udmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int add, char *name);
+
+void udmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object);
+
+void udmu_tx_abort(dmu_tx_t *tx);
+
+int udmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how);
+
+void udmu_tx_wait(dmu_tx_t *tx);
+
+int udmu_indblk_overhead(dmu_buf_t *db, unsigned long *used,
+                         unsigned long *overhead);
+
+void udmu_tx_commit(dmu_tx_t *tx);
+
+void * udmu_tx_cb_create(size_t bytes);
+
+int udmu_tx_cb_add(dmu_tx_t *tx, void *func, void *data);
+
+int udmu_tx_cb_destroy(void *data);
+
+int udmu_object_is_zap(dmu_buf_t *);
+
+int udmu_indblk_overhead(dmu_buf_t *db, unsigned long *used, unsigned 
+                                long *overhead);
+
+int udmu_get_blocksize(dmu_buf_t *db, long *blksz);
+
+int udmu_object_get_links(dmu_buf_t *db);
+void udmu_object_links_inc(dmu_buf_t *db, dmu_tx_t *tx);
+void udmu_object_links_dec(dmu_buf_t *db, dmu_tx_t *tx);
+
+int udmu_get_xattr(dmu_buf_t *db, void *val, int vallen, const char *name);
+int udmu_set_xattr(dmu_buf_t *db, void *val, int vallen,
+                   const char *name, dmu_tx_t *tx);
+int udmu_del_xattr(dmu_buf_t *db, const char *name, dmu_tx_t *tx);
+int udmu_list_xattr(dmu_buf_t *db, void *val, int vallen);
+
+#ifdef  __cplusplus
+}
+#endif
+
+#endif /* _DMU_H */
diff --git a/lustre/dmu-osd/udmu_util.c b/lustre/dmu-osd/udmu_util.c
new file mode 100644 (file)
index 0000000..93b7e86
--- /dev/null
@@ -0,0 +1,241 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/dmu/udmu.c
+ *  Module that interacts with the ZFS DMU and provides an abstraction
+ *  to the rest of Lustre.
+ *
+ *  Copyright (c) 2007 Cluster File Systems, Inc.
+ *   Author: Manoj Joseph <manoj.joseph@sun.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/debug.h>
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/errno.h>
+
+#include <udmu.h>
+#include <udmu_util.h>
+
+static int udmu_util_object_delete(udmu_objset_t *uos, dmu_buf_t **dbp,
+                                   void *tag)
+{
+        dmu_tx_t *tx;
+        uint64_t id;
+        int rc;
+
+        id = udmu_object_get_id(*dbp);
+        tx = udmu_tx_create(uos);
+
+        udmu_tx_hold_free(tx, id, 0, DMU_OBJECT_END);
+
+        rc = udmu_tx_assign(tx, TXG_WAIT);
+        if (rc) {
+                fprintf(stderr,
+                        "udmu_util_object_delete: udmu_tx_assign failed (%d)", rc);
+                udmu_tx_abort(tx);
+                return (rc);
+        }
+
+        rc = udmu_object_delete(uos, dbp, tx, tag);
+        if (rc)
+                fprintf(stderr, "udmu_object_delete() failed (%d)", rc);
+
+        udmu_tx_commit(tx);
+        return rc;
+}
+
+int udmu_util_mkdir(udmu_objset_t *uos, dmu_buf_t *parent_db,
+                    const char *name, dmu_buf_t **new_dbp, void *tag)
+{
+        dmu_buf_t *db;
+        dmu_tx_t *tx;
+        uint64_t id, pid, value;
+        int rc;
+
+        /* return EEXIST early to avoid object creation/deletion */
+        rc = udmu_zap_lookup(uos, parent_db, name, &id,
+                             sizeof(id), sizeof(uint64_t));
+        if (rc == 0)
+                return EEXIST;
+
+        pid = udmu_object_get_id(parent_db);
+
+        tx = udmu_tx_create(uos);
+        udmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL); /* for zap create */
+        udmu_tx_hold_bonus(tx, pid); /* for zap_add */
+        udmu_tx_hold_zap(tx, pid, 1, (char *)name); /* for zap_add */
+
+        rc = udmu_tx_assign(tx, TXG_WAIT);
+        if (rc) {
+                fprintf(stderr,
+                        "udmu_util_mkdir: udmu_tx_assign failed (%d)", rc);
+                udmu_tx_abort(tx);
+                return (rc);
+        }
+
+        udmu_zap_create(uos, &db, tx, tag);
+        id = udmu_object_get_id(db);
+        value = ZFS_DIRENT_MAKE(0, id);
+        rc = udmu_zap_insert(uos, parent_db, tx, name, &value, sizeof(value));
+        udmu_tx_commit(tx);
+
+        if (rc) {
+                fprintf(stderr, "can't insert (%s) in zap (%d)", name, rc);
+                /* error handling, delete just created object */
+                udmu_util_object_delete(uos, &db, tag);
+        } else if (new_dbp) {
+                *new_dbp = db;
+        } else {
+                udmu_object_put_dmu_buf(db, tag);
+        }
+
+        return (rc);
+}
+
+int udmu_util_setattr(udmu_objset_t *uos, dmu_buf_t *db, vnattr_t *va)
+{
+        dmu_tx_t *tx;
+        int rc;
+
+        tx = udmu_tx_create(uos);
+        udmu_tx_hold_bonus(tx, udmu_object_get_id(db));
+
+        rc = udmu_tx_assign(tx, TXG_WAIT);
+        if (rc) {
+                udmu_tx_abort(tx);
+        } else {
+                udmu_object_setattr(db, tx, va);
+                udmu_tx_commit(tx);
+        }
+
+        return (rc);
+}
+
+int udmu_util_create(udmu_objset_t *uos, dmu_buf_t *parent_db,
+                     const char *name, dmu_buf_t **new_dbp, void *tag)
+{
+        dmu_buf_t *db;
+        dmu_tx_t *tx;
+        uint64_t id, pid, value;
+        int rc;
+
+        /* return EEXIST early to avoid object creation/deletion */
+        rc = udmu_zap_lookup(uos, parent_db, name, &id,
+                             sizeof(id), sizeof(uint64_t));
+        if (rc == 0)
+                return EEXIST;
+
+        pid = udmu_object_get_id(parent_db);
+
+        tx = udmu_tx_create(uos);
+
+        udmu_tx_hold_bonus(tx, DMU_NEW_OBJECT);
+        udmu_tx_hold_bonus(tx, pid);
+        udmu_tx_hold_zap(tx, pid, 1, (char *) name);
+
+        rc = udmu_tx_assign(tx, TXG_WAIT);
+        if (rc) {
+                fprintf(stderr,
+                        "udmu_util_create: udmu_tx_assign failed (%d)", rc);
+                udmu_tx_abort(tx);
+                return (rc);
+        }
+
+        udmu_object_create(uos, &db, tx, tag);
+        id = udmu_object_get_id(db);
+        value = ZFS_DIRENT_MAKE(0, id);
+        rc = udmu_zap_insert(uos, parent_db, tx, name,
+                             &value, sizeof(value));
+        udmu_tx_commit(tx);
+
+        if (rc) {
+                fprintf(stderr, "can't insert new object in zap (%d)", rc);
+                /* error handling, delete just created object */
+                udmu_util_object_delete(uos, &db, tag);
+        } else if (new_dbp) {
+                *new_dbp = db;
+        } else {
+                udmu_object_put_dmu_buf(db, tag);
+        }
+
+        return (rc);
+}
+
+int udmu_util_lookup(udmu_objset_t *uos, dmu_buf_t *parent_db,
+                     const char *name, dmu_buf_t **new_dbp, void *tag)
+{
+        uint64_t id;
+        int rc;
+
+        rc = udmu_zap_lookup(uos, parent_db, name, &id,
+                             sizeof(id), sizeof(uint64_t));
+        if (rc == 0) {
+                udmu_object_get_dmu_buf(uos, id, new_dbp, tag);
+        }
+
+        return (rc);
+}
+
+int udmu_util_write(udmu_objset_t *uos, dmu_buf_t *db,
+                    uint64_t offset, uint64_t len, void *buf)
+{
+        dmu_tx_t *tx;
+        int set_size = 0;
+        uint64_t end = offset + len;
+        vnattr_t va;
+        int rc;
+
+        udmu_object_getattr(db, &va);
+
+        if (va.va_size < end) {
+                /* extending write; set file size */
+                set_size = 1;
+                va.va_mask = AT_SIZE;
+                va.va_size = end;
+        }
+
+        tx = udmu_tx_create(uos);
+        if (set_size) {
+                udmu_tx_hold_bonus(tx, udmu_object_get_id(db));
+        }
+        udmu_tx_hold_write(tx, udmu_object_get_id(db), offset, len);
+
+        rc = udmu_tx_assign(tx, TXG_WAIT);
+        if (rc) {
+                fprintf(stderr, "dmu_tx_assign() failed %d", rc);
+                udmu_tx_abort(tx);
+                return (-rc);
+        }
+
+        udmu_object_write(uos, db, tx, offset,
+                          len, buf);
+        if (set_size) {
+                udmu_object_setattr(db, tx, &va);
+        }
+
+        udmu_tx_commit(tx);
+
+        return (len);
+}
diff --git a/lustre/dmu-osd/udmu_util.h b/lustre/dmu-osd/udmu_util.h
new file mode 100644 (file)
index 0000000..9d26da6
--- /dev/null
@@ -0,0 +1,59 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/dmu/udmu.c
+ *  Module that interacts with the ZFS DMU and provides an abstraction
+ *  to the rest of Lustre.
+ *
+ *  Copyright (c) 2007 Cluster File Systems, Inc.
+ *   Author: Manoj Joseph <manoj.joseph@sun.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef _DMU_UTIL_H
+#define _DMU_UTIL_H
+
+#ifdef DMU_OSD
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int udmu_util_lookup(udmu_objset_t *uos, dmu_buf_t *parent_db,
+                     const char *name, dmu_buf_t **new_dbp, void *tag);
+
+int udmu_util_create(udmu_objset_t *uos, dmu_buf_t *parent_db,
+                     const char *name, dmu_buf_t **new_db, void *tag);
+
+int udmu_util_mkdir(udmu_objset_t *uos, dmu_buf_t *parent_db,
+                    const char *name, dmu_buf_t **new_db, void *tag);
+
+int udmu_util_setattr(udmu_objset_t *uos, dmu_buf_t *db, vnattr_t *va);
+
+int udmu_util_write(udmu_objset_t *uos, dmu_buf_t *db,
+                    uint64_t offset, uint64_t len, void *buf);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* DMU_OSD */
+
+#endif /* _DMU_UTIL_H */