Whamcloud - gitweb
- unland b_fid to HEAD
[fs/lustre-release.git] / lustre / liblustre / super.c
index a03dc4a..028c2c8 100644 (file)
@@ -3,7 +3,7 @@
  *
  * Lustre Light Super operations
  *
- *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *  Copyright (c) 2002-2004 Cluster File Systems, Inc.
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
 
 #include "llite_lib.h"
 
+#ifndef MAY_EXEC
+#define MAY_EXEC        1
+#define MAY_WRITE       2
+#define MAY_READ        4
+#endif
+
+#define S_IXUGO (S_IXUSR|S_IXGRP|S_IXOTH)
+
+static int ll_permission(struct inode *inode, int mask)
+{
+        struct llu_inode_info *lli = llu_i2info(inode);
+        mode_t mode = lli->lli_st_mode;
+
+        if (current->fsuid == lli->lli_st_uid)
+                mode >>= 6;
+        else if (in_group_p(lli->lli_st_gid))
+                mode >>= 3;
+
+        if ((mode & mask & (MAY_READ|MAY_WRITE|MAY_EXEC)) == mask)
+                return 0;
+
+        if ((mask & (MAY_READ|MAY_WRITE)) ||
+            (lli->lli_st_mode & S_IXUGO))
+                if (capable(CAP_DAC_OVERRIDE))
+                        return 0;
+
+        if (mask == MAY_READ ||
+            (S_ISDIR(lli->lli_st_mode) && !(mask & MAY_WRITE))) {
+                if (capable(CAP_DAC_READ_SEARCH))
+                        return 0;
+        }
+
+        return -EACCES;
+}
+
 static void llu_fsop_gone(struct filesys *fs)
 {
         struct llu_sb_info *sbi = (struct llu_sb_info *) fs->fs_private;
         struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp);
-        struct ll_fid rootfid;
+        struct lustre_cfg lcfg;
+        int next = 0;
         ENTRY;
 
         list_del(&sbi->ll_conn_chain);
         obd_disconnect(sbi->ll_osc_exp, 0);
+        obd_disconnect(sbi->ll_mdc_exp, 0);
 
-        /* NULL request to force sync on the MDS, and get the last_committed
-         * value to flush remaining RPCs from the sending queue on client.
-         *
-         * XXX This should be an mdc_sync() call to sync the whole MDS fs,
-         *     which we can call for other reasons as well.
-         */
-        if (!obd->obd_no_recov)
-                mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
+        while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL)
+        {
+                int err;
 
-        obd_disconnect(sbi->ll_mdc_exp, 0);
+                LCFG_INIT(lcfg, LCFG_CLEANUP, obd->obd_name);
+                err = class_process_config(&lcfg);
+                if (err) {
+                        CERROR("cleanup failed: %s\n", obd->obd_name);
+                }
+
+                LCFG_INIT(lcfg, LCFG_DETACH, obd->obd_name);
+                err = class_process_config(&lcfg);
+                if (err) {
+                        CERROR("detach failed: %s\n", obd->obd_name);
+                }
+        }
 
         OBD_FREE(sbi, sizeof(*sbi));
 
@@ -337,13 +380,14 @@ static struct inode* llu_new_inode(struct filesys *fs,
         return inode;
 }
 
-static int llu_have_md_lock(struct inode *inode)
+static int llu_have_md_lock(struct inode *inode, __u64 lockpart)
 {
         struct llu_sb_info *sbi = llu_i2sbi(inode);
         struct llu_inode_info *lli = llu_i2info(inode);
         struct lustre_handle lockh;
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obddev;
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
         int flags;
         ENTRY;
 
@@ -357,14 +401,14 @@ static int llu_have_md_lock(struct inode *inode)
 
         /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PR, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PR, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PR);
                 RETURN(1);
         }
 
-        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
-                            NULL, LCK_PW, &lockh)) {
+        if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_IBITS,
+                            &policy, LCK_PW, &lockh)) {
                 ldlm_lock_decref(&lockh, LCK_PW);
                 RETURN(1);
         }
@@ -382,7 +426,7 @@ static int llu_inode_revalidate(struct inode *inode)
                 RETURN(0);
         }
 
-        if (!llu_have_md_lock(inode)) {
+        if (!llu_have_md_lock(inode, MDS_INODELOCK_UPDATE)) {
                 struct lustre_md md;
                 struct ptlrpc_request *req = NULL;
                 struct llu_sb_info *sbi = llu_i2sbi(inode);
@@ -402,7 +446,8 @@ static int llu_inode_revalidate(struct inode *inode)
                         CERROR("failure %d inode %lu\n", rc, lli->lli_st_ino);
                         RETURN(-abs(rc));
                 }
-                rc = mdc_req2lustre_md(req, 0, sbi->ll_osc_exp, &md);
+                rc = mdc_req2lustre_md(sbi->ll_mdc_exp, req, 0, sbi->ll_osc_exp, 
+                                       &md);
 
                 /* XXX Too paranoid? */
                 if (((md.body->valid ^ valid) & OBD_MD_FLEASIZE) &&
@@ -435,14 +480,7 @@ static int llu_inode_revalidate(struct inode *inode)
 
         /* ll_glimpse_size will prefer locally cached writes if they extend
          * the file */
-        {
-                struct ost_lvb lvb;
-                ldlm_error_t err;
-
-                err = llu_glimpse_size(inode, &lvb);
-                lli->lli_st_size = lvb.lvb_size;
-        }
-        RETURN(0);
+        RETURN(llu_glimpse_size(inode));
 }
 
 static void copy_stat_buf(struct inode *ino, struct intnl_stat *b)
@@ -471,6 +509,8 @@ static int llu_iop_getattr(struct pnode *pno,
         int rc;
         ENTRY;
 
+        liblustre_wait_event(0);
+
         if (!ino) {
                 LASSERT(pno);
                 LASSERT(pno->p_base->pb_ino);
@@ -493,11 +533,12 @@ static int llu_iop_getattr(struct pnode *pno,
 
 static int null_if_equal(struct ldlm_lock *lock, void *data)
 {
-        if (data == lock->l_ast_data)
+        if (data == lock->l_ast_data) {
                 lock->l_ast_data = NULL;
 
-        if (lock->l_req_mode != lock->l_granted_mode)
-                return LDLM_ITER_STOP;
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        LDLM_ERROR(lock,"clearing inode with ungranted lock\n");
+        }
 
         return LDLM_ITER_CONTINUE;
 }
@@ -539,6 +580,7 @@ void llu_iop_gone(struct inode *inode)
         struct llu_inode_info *lli = llu_i2info(inode);
         ENTRY;
 
+        liblustre_wait_event(0);
         llu_clear_inode(inode);
 
         OBD_FREE(lli, sizeof(*lli));
@@ -651,11 +693,16 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
                         RETURN(rc);
                 }
 
-                rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md);
+                rc = mdc_req2lustre_md(sbi->ll_mdc_exp, request, 0, 
+                                       sbi->ll_osc_exp, &md);
                 if (rc) {
                         ptlrpc_req_finished(request);
                         RETURN(rc);
                 }
+
+                /* Won't invoke vmtruncate as we already cleared ATTR_SIZE,
+                 * but needed to set timestamps backwards on utime. */
+                inode_setattr(inode, attr);
                 llu_update_inode(inode, md.body, md.lsm);
                 ptlrpc_req_finished(request);
 
@@ -672,7 +719,7 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
                         /* from sys_utime() */
                         if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) {
                                 if (current->fsuid != lli->lli_st_uid &&
-                                    (rc = ll_permission(inode, 0/*MAY_WRITE*/, NULL)) != 0)
+                                    (rc = ll_permission(inode, MAY_WRITE)) != 0)
                                         RETURN(rc);
                         } else {
                                /* from inode_change_ok() */
@@ -706,9 +753,6 @@ int llu_setattr_raw(struct inode *inode, struct iattr *attr)
                 }
 
                 rc = llu_vmtruncate(inode, attr->ia_size);
-                if (rc == 0)
-                        set_bit(LLI_F_HAVE_OST_SIZE_LOCK,
-                                &llu_i2info(inode)->lli_flags);
 
                 /* unlock now as we don't mind others file lockers racing with
                  * the mds updates below? */
@@ -745,6 +789,11 @@ static int llu_iop_setattr(struct pnode *pno,
         struct iattr iattr;
         ENTRY;
 
+        liblustre_wait_event(0);
+
+        LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME | 
+                           SETATTR_UID | SETATTR_GID |
+                           SETATTR_LEN | SETATTR_MODE)));
         memset(&iattr, 0, sizeof(iattr));
 
         if (mask & SETATTR_MODE) {
@@ -944,13 +993,18 @@ static int llu_iop_link_raw(struct pnode *old, struct pnode *new)
         LASSERT(src);
         LASSERT(dir);
 
+        liblustre_wait_event(0);
         llu_prepare_mdc_op_data(&op_data, src, dir, name, namelen, 0);
         rc = mdc_link(llu_i2sbi(src)->ll_mdc_exp, &op_data, &request);
         ptlrpc_req_finished(request);
+        liblustre_wait_event(0);
 
         RETURN(rc);
 }
 
+/*
+ * libsysio will clear the inode immediately after return
+ */
 static int llu_iop_unlink_raw(struct pnode *pno)
 {
         struct inode *dir = pno->p_base->pb_parent->pb_ino;
@@ -965,30 +1019,21 @@ static int llu_iop_unlink_raw(struct pnode *pno)
 
         LASSERT(target);
 
+        liblustre_wait_event(0);
         llu_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
         rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request);
-        if (!rc) {
+        if (!rc)
                 rc = llu_objects_destroy(request, dir);
-
-                llu_i2info(target)->lli_stale_flag = 1;
-                unhook_stale_inode(pno);
-        }
-
         ptlrpc_req_finished(request);
+        liblustre_wait_event(0);
+
         RETURN(rc);
 }
 
-/* FIXME
- * following cases need to be considered later:
- * - rename an opened file/dir
- * - an opened file be removed in rename
- * - rename to remove and hardlink (?opened)
- */
 static int llu_iop_rename_raw(struct pnode *old, struct pnode *new)
 {
         struct inode *src = old->p_parent->p_base->pb_ino;
         struct inode *tgt = new->p_parent->p_base->pb_ino;
-        struct inode *tgtinode = new->p_base->pb_ino;
         const char *oldname = old->p_base->pb_name.name;
         int oldnamelen = old->p_base->pb_name.len;
         const char *newname = new->p_base->pb_name.name;
@@ -1007,11 +1052,6 @@ static int llu_iop_rename_raw(struct pnode *old, struct pnode *new)
                         &request);
         if (!rc) {
                 rc = llu_objects_destroy(request, src);
-
-                if (tgtinode) {
-                        llu_i2info(tgtinode)->lli_stale_flag = 1;
-                        unhook_stale_inode(new);
-                }
         }
 
         ptlrpc_req_finished(request);
@@ -1105,6 +1145,8 @@ static int llu_iop_statvfs(struct pnode *pno,
         int rc;
         ENTRY;
 
+        liblustre_wait_event(0);
+
 #ifndef __CYGWIN__
         LASSERT(pno->p_base->pb_ino);
         rc = llu_statfs(llu_i2sbi(pno->p_base->pb_ino), &fs);
@@ -1172,25 +1214,119 @@ static int llu_iop_rmdir_raw(struct pnode *pno)
         rc = mdc_unlink(llu_i2sbi(dir)->ll_mdc_exp, &op_data, &request);
         ptlrpc_req_finished(request);
 
-        /* libsysio: remove the pnode right away */
-        if (!rc) {
-                llu_i2info(pno->p_base->pb_ino)->lli_stale_flag = 1;
-                unhook_stale_inode(pno);
+        RETURN(rc);
+}
+
+#ifdef O_DIRECT
+#define FCNTL_FLMASK (O_APPEND|O_NONBLOCK|O_ASYNC|O_DIRECT)
+#else
+#define FCNTL_FLMASK (O_APPEND|O_NONBLOCK|O_ASYNC)
+#endif
+#define FCNTL_FLMASK_INVALID (O_NONBLOCK|O_ASYNC)
+
+static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap, int *rtn)
+{
+        struct llu_inode_info *lli = llu_i2info(ino);
+        long flags;
+
+        switch (cmd) {
+        case F_GETFL:
+                *rtn = lli->lli_open_flags;
+                return 0;
+        case F_SETFL:
+                flags = va_arg(ap, long);
+                flags &= FCNTL_FLMASK;
+                if (flags & FCNTL_FLMASK_INVALID) {
+                        CERROR("liblustre don't support O_NONBLOCK, O_ASYNC, "
+                               "and O_DIRECT on file descriptor\n");
+                        *rtn = -1;
+                        return EINVAL;
+                }
+                lli->lli_open_flags = (int) flags;
+                *rtn = 0;
+                return 0;
         }
 
-        RETURN(rc);
+        CERROR("unsupported fcntl cmd %x\n", cmd);
+        *rtn = -1;
+        return ENOSYS;
 }
 
-static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap)
+static int llu_get_grouplock(struct inode *inode, unsigned long arg)
 {
-        CERROR("liblustre did not support fcntl\n");
-        return -ENOSYS;
+        struct llu_inode_info *lli = llu_i2info(inode);
+        struct ll_file_data *fd = lli->lli_file_data;
+        ldlm_policy_data_t policy = { .l_extent = { .start = 0,
+                                                    .end = OBD_OBJECT_EOF}};
+        struct lustre_handle lockh = { 0 };
+        struct lov_stripe_md *lsm = lli->lli_smd;
+        ldlm_error_t err;
+        int flags = 0;
+        ENTRY;
+
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+                RETURN(-EINVAL);
+        }
+
+        policy.l_extent.gid = arg;
+        if (lli->lli_open_flags & O_NONBLOCK)
+                flags = LDLM_FL_BLOCK_NOWAIT;
+
+        err = llu_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh,
+                              flags);
+        if (err)
+                RETURN(err);
+
+        fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK;
+        fd->fd_gid = arg;
+        memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh));
+
+        RETURN(0);
 }
 
+static int llu_put_grouplock(struct inode *inode, unsigned long arg)
+{
+        struct llu_inode_info *lli = llu_i2info(inode);
+        struct ll_file_data *fd = lli->lli_file_data;
+        struct lov_stripe_md *lsm = lli->lli_smd;
+        ldlm_error_t err;
+        ENTRY;
+
+        if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED))
+                RETURN(-EINVAL);
+
+        if (fd->fd_gid != arg)
+                RETURN(-EINVAL);
+
+        fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
+
+        err = llu_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh);
+        if (err)
+                RETURN(err);
+
+        fd->fd_gid = 0;
+        memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh));
+
+        RETURN(0);
+}       
+
 static int llu_iop_ioctl(struct inode *ino, unsigned long int request,
                          va_list ap)
 {
-        CERROR("liblustre did not support ioctl\n");
+        unsigned long arg;
+
+        liblustre_wait_event(0);
+
+        switch (request) {
+        case LL_IOC_GROUP_LOCK:
+                arg = va_arg(ap, unsigned long);
+                return llu_get_grouplock(ino, arg);
+        case LL_IOC_GROUP_UNLOCK:
+                arg = va_arg(ap, unsigned long);
+                return llu_put_grouplock(ino, arg);
+        }
+
+        CERROR("did not support ioctl cmd %lx\n", request);
         return -ENOSYS;
 }
 
@@ -1199,11 +1335,13 @@ static int llu_iop_ioctl(struct inode *ino, unsigned long int request,
  */
 static int llu_iop_sync(struct inode *inode)
 {
+        liblustre_wait_event(0);
         return 0;
 }
 
 static int llu_iop_datasync(struct inode *inode)
 {
+        liblustre_wait_event(0);
         return 0;
 }
 
@@ -1220,8 +1358,11 @@ struct inode *llu_iget(struct filesys *fs, struct lustre_md *md)
 
         if ((md->body->valid &
              (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) !=
-            (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE))
-                CERROR("invalide fields!\n");
+            (OBD_MD_FLGENER | OBD_MD_FLID | OBD_MD_FLTYPE)) {
+                CERROR("bad md body valid mask 0x%x\n", md->body->valid);
+                LBUG();
+                return ERR_PTR(-EPERM);
+        }
 
         /* try to find existing inode */
         fid.id = md->body->ino;
@@ -1232,9 +1373,10 @@ struct inode *llu_iget(struct filesys *fs, struct lustre_md *md)
         if (inode) {
                 struct llu_inode_info *lli = llu_i2info(inode);
 
-                if (lli->lli_stale_flag ||
-                    lli->lli_st_generation != md->body->generation)
+                if (inode->i_zombie ||
+                    lli->lli_st_generation != md->body->generation) {
                         I_RELE(inode);
+                }
                 else {
                         llu_update_inode(inode, md->body, md->lsm);
                         return inode;
@@ -1270,9 +1412,10 @@ llu_fsswop_mount(const char *source,
         struct lustre_handle osc_conn = {0, };
         struct lustre_md md;
         class_uuid_t uuid;
+        struct config_llog_instance cfg;
         struct lustre_profile *lprof;
         char *osc = NULL, *mdc = NULL;
-        int err = -EINVAL;
+        int async = 1, err = -EINVAL;
 
         ENTRY;
 
@@ -1285,61 +1428,38 @@ llu_fsswop_mount(const char *source,
         generate_random_uuid(uuid);
         class_uuid_unparse(uuid, &sbi->ll_sb_uuid);
 
-        /* zeroconf */
-        if (g_zconf) {
-                struct config_llog_instance cfg;
-                int len;
-
-                if (!g_zconf_mdsname) {
-                        CERROR("no mds name\n");
-                        GOTO(out_free, err = -EINVAL);
-                }
-
-                /* generate a string unique to this super, let's try
-                 the address of the super itself.*/
-                len = (sizeof(sbi) * 2) + 1; 
-                OBD_ALLOC(sbi->ll_instance, len);
-                if (sbi->ll_instance == NULL) 
-                        GOTO(out_free, err = -ENOMEM);
-                sprintf(sbi->ll_instance, "%p", sbi);
-
-                cfg.cfg_instance = sbi->ll_instance;
-                cfg.cfg_uuid = sbi->ll_sb_uuid;
-                err = liblustre_process_log(&cfg, 1);
-                if (err < 0) {
-                        CERROR("Unable to process log: %s\n", g_zconf_profile);
-
-                        GOTO(out_free, err);
-                }
-
-                lprof = class_get_profile(g_zconf_profile);
-                if (lprof == NULL) {
-                        CERROR("No profile found: %s\n", g_zconf_profile);
-                        GOTO(out_free, err = -EINVAL);
-                }
-                if (osc)
-                        OBD_FREE(osc, strlen(osc) + 1);
-                OBD_ALLOC(osc, strlen(lprof->lp_osc) + 
-                          strlen(sbi->ll_instance) + 2);
-                sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance);
-
-                if (mdc)
-                        OBD_FREE(mdc, strlen(mdc) + 1);
-                OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + 
-                          strlen(sbi->ll_instance) + 2);
-                sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance);
-        } else {
-                /* setup from dump_file */
-                if (list_empty(&lustre_profile_list)) {
-                        CERROR("no profile\n");
-                        GOTO(out_free, err = -EINVAL);
-                }
+        /* generate a string unique to this super, let's try
+         the address of the super itself.*/
+        OBD_ALLOC(sbi->ll_instance, sizeof(sbi) * 2 + 1);
+        if (sbi->ll_instance == NULL) 
+                GOTO(out_free, err = -ENOMEM);
+        sprintf(sbi->ll_instance, "%p", sbi);
+
+        /* retrive & parse config log */
+        cfg.cfg_instance = sbi->ll_instance;
+        cfg.cfg_uuid = sbi->ll_sb_uuid;
+        err = liblustre_process_log(&cfg, 1);
+        if (err < 0) {
+                CERROR("Unable to process log: %s\n", g_zconf_profile);
+                GOTO(out_free, err);
+        }
 
-                lprof = list_entry(lustre_profile_list.next,
-                                   struct lustre_profile, lp_list);
-                osc = lprof->lp_osc;
-                mdc = lprof->lp_mdc;
+        lprof = class_get_profile(g_zconf_profile);
+        if (lprof == NULL) {
+                CERROR("No profile found: %s\n", g_zconf_profile);
+                GOTO(out_free, err = -EINVAL);
         }
+        if (osc)
+                OBD_FREE(osc, strlen(osc) + 1);
+        OBD_ALLOC(osc, strlen(lprof->lp_osc) + 
+                  strlen(sbi->ll_instance) + 2);
+        sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance);
+
+        if (mdc)
+                OBD_FREE(mdc, strlen(mdc) + 1);
+        OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + 
+                  strlen(sbi->ll_instance) + 2);
+        sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance);
 
         if (!osc) {
                 CERROR("no osc\n");
@@ -1361,12 +1481,15 @@ llu_fsswop_mount(const char *source,
                 CERROR("MDC %s: not setup or attached\n", mdc);
                 GOTO(out_free, err = -EINVAL);
         }
-
+        obd_set_info(obd->obd_self_export, strlen("async"), "async",
+                     sizeof(async), &async);
+#warning "FIXME ASAP!"
+#if 0
         if (mdc_init_ea_size(obd, osc))
                 GOTO(out_free, err = -EINVAL);
-
+#endif
         /* setup mdc */
-        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid);
+        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid, 0);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", mdc, err);
                 GOTO(out_free, err);
@@ -1387,8 +1510,10 @@ llu_fsswop_mount(const char *source,
                 CERROR("OSC %s: not setup or attached\n", osc);
                 GOTO(out_mdc, err = -EINVAL);
         }
+        obd_set_info(obd->obd_self_export, strlen("async"), "async",
+                     sizeof(async), &async);
 
-        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid);
+        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, 0);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
                 GOTO(out_mdc, err);
@@ -1411,7 +1536,8 @@ llu_fsswop_mount(const char *source,
                 GOTO(out_osc, err);
         }
 
-        err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md);
+        err = mdc_req2lustre_md(sbi->ll_mdc_exp, request, 0, sbi->ll_osc_exp, 
+                                &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
                 GOTO(out_request, err);
@@ -1420,7 +1546,7 @@ llu_fsswop_mount(const char *source,
         LASSERT(sbi->ll_rootino != 0);
 
         root = llu_iget(fs, &md);
-        if (root == NULL) {
+        if (!root || IS_ERR(root)) {
                 CERROR("fail to generate root inode\n");
                 GOTO(out_request, err = -EBADF);
         }
@@ -1477,8 +1603,9 @@ static struct inode_ops llu_inode_ops = {
         inop_link:      llu_iop_link_raw,
         inop_unlink:    llu_iop_unlink_raw,
         inop_rename:    llu_iop_rename_raw,
-        inop_ipreadv:   llu_iop_ipreadv,
-        inop_ipwritev:  llu_iop_ipwritev,
+        inop_pos:       llu_iop_pos,
+        inop_read:      llu_iop_read,
+        inop_write:     llu_iop_write,
         inop_iodone:    llu_iop_iodone,
         inop_fcntl:     llu_iop_fcntl,
         inop_sync:      llu_iop_sync,
@@ -1490,5 +1617,3 @@ static struct inode_ops llu_inode_ops = {
 #endif
         inop_gone:      llu_iop_gone,
 };
-
-#warning "time_after() defined in liblustre.h need to be rewrite in userspace"