Whamcloud - gitweb
Land b_hd_capa onto HEAD (20050809_1942)
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 21d7464..41659a4 100644 (file)
@@ -297,16 +297,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         if (rc)
                 GOTO(cleanup, rc);
 
-        dentry = filter_oa2dentry(obd, oa);
+        dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
 
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
-        }
-
         inode = dentry->d_inode; 
 
         fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
@@ -318,13 +312,14 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                if (inode->i_size <= rnb->offset)
-                      /* If there's no more data, abort early.
-                      * lnb->page == NULL and lnb->rc == 0, so it's
-                      * easy to detect later. */
+                if ((inode && inode->i_size <= rnb->offset) || inode == NULL)
+                        /*
+                         * if there's no more data, abort early.  lnb->page == *
+                         * NULL and lnb->rc == 0, so it's easy to detect later.
+                         */
                         break;
-                else
-                        rc = filter_alloc_dio_page(obd, inode, lnb);
+                
+                rc = filter_alloc_dio_page(obd, inode, lnb);
                 if (rc) {
                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                              "page err %u@"LPU64" %u/%u %p: rc %d\n",
@@ -345,33 +340,37 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(now, obd_timeout, "start_page_read");
 
-        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
-                              NULL, NULL, NULL);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
+        if (inode != NULL) {
+                rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+                                      exp, NULL, NULL, NULL);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
 
+        lprocfs_counter_add(obd->obd_stats,
+                            LPROC_FILTER_READ_BYTES, tot_bytes);
         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
 
         EXIT;
-
 cleanup:
-        if (rc != 0) {
-                filter_free_dio_pages(objcount, obj, niocount, res);
-
-                if (dentry != NULL)
-                        f_dput(dentry);
-                else
-                        CERROR("NULL dentry in cleanup -- tell CFS\n");
+        if (rc) {
+                filter_free_dio_pages(objcount, obj,
+                                      niocount, res);
+                /*
+                 * in other cases (no errors) dentry is released in
+                 * filter_commitrw_read().
+                 */
+                f_dput(dentry);
         }
 
         if (iobuf != NULL)
                 filter_free_iobuf(iobuf);
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
         if (rc)
                 CERROR("io error %d\n", rc);
+        
         return rc;
 }
 
@@ -498,15 +497,19 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct niobuf_local *res,
                                struct obd_trans_info *oti)
 {
+        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        struct niobuf_local *lnb = res;
+        struct dentry *dentry = NULL;
+        unsigned long now = jiffies;
         struct lvfs_run_ctxt saved;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb = res;
         struct fsfilt_objinfo fso;
-        struct dentry *dentry = NULL;
-        void *iobuf; 
+        struct obd_device *obd;
         obd_size left;
-        unsigned long now = jiffies;
-        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        obd_uid uid;
+        obd_gid gid;
+        void *iobuf; 
+        
         ENTRY;
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
@@ -518,26 +521,25 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(cleanup, rc);
         cleanup_phase = 1;
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
-                                  obj->ioo_id);
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        uid = oa->o_valid & OBD_MD_FLUID ? oa->o_uid : 0;
+        gid = oa->o_valid & OBD_MD_FLGID ? oa->o_gid : 0;
+        
+        /* make sure that object is already allocated */
+        dentry = filter_crow_object(obd, oa);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
-        
+
         cleanup_phase = 2;
-        
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                       obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
-        }
 
         fso.fso_dentry = dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
         fsfilt_check_slow(now, obd_timeout, "preprw_write setup");
 
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
+        spin_lock(&obd->obd_osfs_lock);
         if (oa)
                 filter_grant_incoming(exp, oa);
         
@@ -554,7 +556,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
          * o_valid here. */
         oa->o_valid = 0;
 
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
+        spin_unlock(&obd->obd_osfs_lock);
 
         if (rc) 
                 GOTO(cleanup, rc);
@@ -569,7 +571,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
+                rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb);
                 if (rc) {
                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
                                lnb->len, lnb->offset,
@@ -586,8 +588,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                  * asked to read unmapped blocks -- brw_kiovec() does this. */
                 if (lnb->len != PAGE_SIZE) {
                         if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
-                                filter_iobuf_add_page(exp->exp_obd, iobuf,
-                                                      dentry->d_inode,
+                                filter_iobuf_add_page(obd, iobuf, dentry->d_inode,
                                                       lnb->page);
                         } else {
                                 memset(kmap(lnb->page) + lnb->len, 0,
@@ -604,7 +605,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         
         fsfilt_check_slow(now, obd_timeout, "start_page_write");
 
-        lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
         EXIT;
 cleanup:
@@ -613,18 +614,18 @@ cleanup:
                 if (rc)
                         filter_free_dio_pages(objcount, obj, niocount, res);
         case 3:
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 filter_free_iobuf(iobuf);
         case 2:
-                if (rc)
+                if (rc && dentry && !IS_ERR(dentry))
                         f_dput(dentry);
                 break;
         case 1:
-                spin_lock(&exp->exp_obd->obd_osfs_lock);
+                spin_lock(&obd->obd_osfs_lock);
                 if (oa)
                         filter_grant_incoming(exp, oa);
-                spin_unlock(&exp->exp_obd->obd_osfs_lock);
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                spin_unlock(&obd->obd_osfs_lock);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 filter_free_iobuf(iobuf);
                 break;
         default:;
@@ -636,8 +637,14 @@ cleanup:
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                   int objcount, struct obd_ioobj *obj, int niocount,
                   struct niobuf_remote *nb, struct niobuf_local *res,
-                  struct obd_trans_info *oti)
+                  struct obd_trans_info *oti, struct lustre_capa *capa)
 {
+        int rc;
+
+        rc = filter_verify_capa(cmd, exp, capa);
+        if (rc)
+                return rc;
+
         if (cmd == OBD_BRW_WRITE)
                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
                                            niocount, nb, res, oti);
@@ -882,16 +889,29 @@ cleanup:
 
 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                     int objcount, struct obd_ioobj *obj, int niocount,
-                    struct niobuf_local *res, struct obd_trans_info *oti,int rc)
+                    struct niobuf_local *res, struct obd_trans_info *oti,int ret)
 {
-        if (cmd == OBD_BRW_WRITE)
-                return filter_commitrw_write(exp, oa, objcount, obj, niocount,
-                                             res, oti, rc);
-        if (cmd == OBD_BRW_READ)
-                return filter_commitrw_read(exp, oa, objcount, obj, niocount,
-                                            res, oti, rc);
-        LBUG();
-        return -EPROTO;
+        int rc = -EPROTO;
+        struct lustre_id *id = obdo_id(oa);
+        __u32 len = sizeof(*id);
+        struct inode * inode = res->dentry->d_inode;
+        struct super_block * sb = res->dentry->d_sb;
+        struct obd_device *obd = class_exp2obd(exp);
+        
+        if (cmd == OBD_BRW_WRITE) {
+                rc = filter_commitrw_write(exp, oa, objcount, obj, niocount,
+                                           res, oti, ret);
+                fsfilt_set_info(obd, sb, inode, 10, "file_write", len, (void*)id);
+        }
+        else if (cmd == OBD_BRW_READ) {
+                rc = filter_commitrw_read(exp, oa, objcount, obj, niocount,
+                                          res, oti, ret);
+                fsfilt_set_info(obd, sb, inode, 9, "file_read", len, (void*)id);
+        }
+        else
+                LBUG();
+
+        return rc;
 }
 
 int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -919,7 +939,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
         obdo_to_ioobj(oa, &ioo);
         ioo.ioo_bufcnt = oa_bufs;
 
-        ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
+        ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti,NULL);
         if (ret != 0)
                 GOTO(out, ret);