Whamcloud - gitweb
LU-911 osd: zerocopy methods in ldiskfs osd
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index 18a3a86..1ff54ba 100644 (file)
@@ -93,7 +93,6 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR";
 static const struct lu_object_operations      osd_lu_obj_ops;
 static const struct dt_object_operations      osd_obj_ops;
 static const struct dt_object_operations      osd_obj_ea_ops;
-static const struct dt_body_operations        osd_body_ops_new;
 static const struct dt_index_operations       osd_index_iam_ops;
 static const struct dt_index_operations       osd_index_ea_ops;
 
@@ -182,8 +181,8 @@ static inline void osd_qid_set_type(struct osd_thandle *oh, int i, int type)
         oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
 }
 
-static void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
-                            int type, uid_t id, struct inode *inode)
+void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
+                     int type, uid_t id, struct inode *inode)
 {
 #ifdef CONFIG_QUOTA
         int i, allocated = 0;
@@ -601,10 +600,14 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
                                         struct dt_device *d)
 {
         struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
         struct osd_thandle     *oh;
         struct thandle         *th;
         ENTRY;
 
+        /* on pending IO in this thread should left from prev. request */
+        LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
+
         th = ERR_PTR(-ENOMEM);
         OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
         if (oh != NULL) {
@@ -716,6 +719,7 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
         int                     rc = 0;
         struct osd_thandle     *oh;
         struct osd_thread_info *oti = osd_oti_get(env);
+        struct osd_iobuf       *iobuf = &oti->oti_iobuf;
 
         ENTRY;
 
@@ -762,6 +766,19 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
                 OBD_FREE_PTR(oh);
         }
 
+        /* as we want IO to journal and data IO be concurrent, we don't block
+         * awaiting data IO completion in osd_do_bio(), instead we wait here
+         * once transaction is submitted to the journal. all reqular requests
+         * don't do direct IO (except read/write), thus this wait_event becomes
+         * no-op for them.
+         *
+         * IMPORTANT: we have to wait till any IO submited by the thread is
+         * completed otherwise iobuf may be corrupted by different request
+         */
+        cfs_wait_event(iobuf->dr_wait, cfs_atomic_read(&iobuf->dr_numreqs)==0);
+        if (!rc)
+                rc = iobuf->dr_error;
+
         RETURN(rc);
 }
 
@@ -1476,14 +1493,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         else
                 parent = osd->od_obj_area;
 
-        LASSERT(parent != NULL);
-        LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
-
 #ifdef HAVE_QUOTA_SUPPORT
         osd_push_ctxt(info->oti_env, save);
 #endif
         inode = ldiskfs_create_inode(oth->ot_handle,
-                                     osd_dt_obj(parent)->oo_inode, mode);
+                                     parent ?  osd_dt_obj(parent)->oo_inode :
+                                               osd_sb(osd)->s_root->d_inode,
+                                     mode);
 #ifdef HAVE_QUOTA_SUPPORT
         osd_pop_ctxt(save);
 #endif
@@ -2667,277 +2683,6 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_data_get          = osd_data_get,
 };
 
-/*
- * Body operations.
- */
-
-/*
- * XXX: Another layering violation for now.
- *
- * We don't want to use ->f_op->read methods, because generic file write
- *
- *         - serializes on ->i_sem, and
- *
- *         - does a lot of extra work like balance_dirty_pages(),
- *
- * which doesn't work for globally shared files like /last-received.
- */
-static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
-{
-        struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
-
-        memcpy(buffer, (char*)ei->i_data, buflen);
-
-        return  buflen;
-}
-
-static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
-                            loff_t *offs)
-{
-        struct buffer_head *bh;
-        unsigned long block;
-        int osize = size;
-        int blocksize;
-        int csize;
-        int boffs;
-        int err;
-
-        /* prevent reading after eof */
-        spin_lock(&inode->i_lock);
-        if (i_size_read(inode) < *offs + size) {
-                size = i_size_read(inode) - *offs;
-                spin_unlock(&inode->i_lock);
-                if (size < 0) {
-                        CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
-                               i_size_read(inode), *offs);
-                        return -EBADR;
-                } else if (size == 0) {
-                        return 0;
-                }
-        } else {
-                spin_unlock(&inode->i_lock);
-        }
-
-        blocksize = 1 << inode->i_blkbits;
-
-        while (size > 0) {
-                block = *offs >> inode->i_blkbits;
-                boffs = *offs & (blocksize - 1);
-                csize = min(blocksize - boffs, size);
-                bh = ldiskfs_bread(NULL, inode, block, 0, &err);
-                if (!bh) {
-                        CERROR("can't read block: %d\n", err);
-                        return err;
-                }
-
-                memcpy(buf, bh->b_data + boffs, csize);
-                brelse(bh);
-
-                *offs += csize;
-                buf += csize;
-                size -= csize;
-        }
-        return osize;
-}
-
-static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
-                        struct lu_buf *buf, loff_t *pos,
-                        struct lustre_capa *capa)
-{
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        int rc;
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
-                RETURN(-EACCES);
-
-        /* Read small symlink from inode body as we need to maintain correct
-         * on-disk symlinks for ldiskfs.
-         */
-        if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
-            (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
-                rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
-        else
-                rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
-
-        return rc;
-}
-
-static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
-{
-
-        memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
-               buflen);
-        LDISKFS_I(inode)->i_disksize = buflen;
-        i_size_write(inode, buflen);
-        inode->i_sb->s_op->dirty_inode(inode);
-
-        return 0;
-}
-
-static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
-                                    loff_t *offs, handle_t *handle)
-{
-        struct buffer_head *bh = NULL;
-        loff_t offset = *offs;
-        loff_t new_size = i_size_read(inode);
-        unsigned long block;
-        int blocksize = 1 << inode->i_blkbits;
-        int err = 0;
-        int size;
-        int boffs;
-        int dirty_inode = 0;
-
-        while (bufsize > 0) {
-                if (bh != NULL)
-                        brelse(bh);
-
-                block = offset >> inode->i_blkbits;
-                boffs = offset & (blocksize - 1);
-                size = min(blocksize - boffs, bufsize);
-                bh = ldiskfs_bread(handle, inode, block, 1, &err);
-                if (!bh) {
-                        CERROR("can't read/create block: %d\n", err);
-                        break;
-                }
-
-                err = ldiskfs_journal_get_write_access(handle, bh);
-                if (err) {
-                        CERROR("journal_get_write_access() returned error %d\n",
-                               err);
-                        break;
-                }
-                LASSERTF(boffs + size <= bh->b_size,
-                         "boffs %d size %d bh->b_size %lu",
-                         boffs, size, (unsigned long)bh->b_size);
-                memcpy(bh->b_data + boffs, buf, size);
-                err = ldiskfs_journal_dirty_metadata(handle, bh);
-                if (err)
-                        break;
-
-                if (offset + size > new_size)
-                        new_size = offset + size;
-                offset += size;
-                bufsize -= size;
-                buf += size;
-        }
-        if (bh)
-                brelse(bh);
-
-        /* correct in-core and on-disk sizes */
-        if (new_size > i_size_read(inode)) {
-                spin_lock(&inode->i_lock);
-                if (new_size > i_size_read(inode))
-                        i_size_write(inode, new_size);
-                if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
-                        LDISKFS_I(inode)->i_disksize = i_size_read(inode);
-                        dirty_inode = 1;
-                }
-                spin_unlock(&inode->i_lock);
-                if (dirty_inode)
-                        inode->i_sb->s_op->dirty_inode(inode);
-        }
-
-        if (err == 0)
-                *offs = offset;
-        return err;
-}
-
-static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
-                                 const loff_t size, loff_t pos,
-                                 struct thandle *handle)
-{
-        struct osd_thandle *oh;
-        int credits;
-
-        LASSERT(handle != NULL);
-
-        oh = container_of0(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle == NULL);
-
-        /* XXX: size == 0 or INT_MAX indicating a catalog header update or
-         *      llog write, see comment in mdd_declare_llog_record().
-         *
-         *      This hack should be removed in 2.3
-         */
-        if (size == DECLARE_LLOG_REWRITE)
-                credits = 2;
-        else if (size == DECLARE_LLOG_WRITE)
-                credits = 6;
-        else
-                credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
-
-        OSD_DECLARE_OP(oh, write);
-        oh->ot_credits += credits;
-
-        if (osd_dt_obj(dt)->oo_inode == NULL)
-                return 0;
-
-        osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
-                        osd_dt_obj(dt)->oo_inode);
-        osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
-                        osd_dt_obj(dt)->oo_inode);
-        return 0;
-}
-
-static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_buf *buf, loff_t *pos,
-                         struct thandle *handle, struct lustre_capa *capa,
-                         int ignore_quota)
-{
-        struct osd_object  *obj   = osd_dt_obj(dt);
-        struct inode       *inode = obj->oo_inode;
-        struct osd_thandle *oh;
-        ssize_t            result = 0;
-#ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t           save = cfs_curproc_cap_pack();
-#endif
-
-        LASSERT(handle != NULL);
-
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
-                RETURN(-EACCES);
-
-        oh = container_of(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle->h_transaction != NULL);
-#ifdef HAVE_QUOTA_SUPPORT
-        if (ignore_quota)
-                cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
-        else
-                cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-#endif
-        /* Write small symlink to inode body as we need to maintain correct
-         * on-disk symlinks for ldiskfs.
-         */
-        if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
-           (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
-                result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
-        else
-                result = osd_ldiskfs_write_record(inode, buf->lb_buf,
-                                                  buf->lb_len, pos,
-                                                  oh->ot_handle);
-#ifdef HAVE_QUOTA_SUPPORT
-        cfs_curproc_cap_unpack(save);
-#endif
-        if (result == 0)
-                result = buf->lb_len;
-        return result;
-}
-
-/*
- * in some cases we may need declare methods for objects being created
- * e.g., when we create symlink
- */
-static const struct dt_body_operations osd_body_ops_new = {
-        .dbo_declare_write = osd_declare_write,
-};
-
-const struct dt_body_operations osd_body_ops = {
-        .dbo_read          = osd_read,
-        .dbo_declare_write = osd_declare_write,
-        .dbo_write         = osd_write
-};
-
 static int osd_index_declare_iam_delete(const struct lu_env *env,
                                         struct dt_object *dt,
                                         const struct dt_key *key,
@@ -4304,6 +4049,11 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
         if (o->od_oi_table != NULL)
                 osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
 
+        if (o->od_fsops) {
+                fsfilt_put_ops(o->od_fsops);
+                o->od_fsops = NULL;
+        }
+
         RETURN(0);
 }
 
@@ -4316,6 +4066,13 @@ static int osd_mount(const struct lu_env *env,
         struct lustre_sb_info    *lsi;
 
         ENTRY;
+
+        o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
+        if (o->od_fsops == NULL) {
+                CERROR("Can't find fsfilt_ldiskfs\n");
+                RETURN(-ENOTSUPP);
+        }
+
         if (o->od_mount != NULL) {
                 CERROR("Already mounted (%s)\n", dev);
                 RETURN(-EEXIST);