Whamcloud - gitweb
- CROW (CReate On Write) (precreation is removed)
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 21d7464..1c9cd4d 100644 (file)
@@ -297,16 +297,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         if (rc)
                 GOTO(cleanup, rc);
 
-        dentry = filter_oa2dentry(obd, oa);
+        dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
 
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
-        }
-
         inode = dentry->d_inode; 
 
         fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
@@ -318,13 +312,14 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                if (inode->i_size <= rnb->offset)
-                      /* If there's no more data, abort early.
-                      * lnb->page == NULL and lnb->rc == 0, so it's
-                      * easy to detect later. */
+                if ((inode && inode->i_size <= rnb->offset) || inode == NULL)
+                        /*
+                         * if there's no more data, abort early.  lnb->page == *
+                         * NULL and lnb->rc == 0, so it's easy to detect later.
+                         */
                         break;
-                else
-                        rc = filter_alloc_dio_page(obd, inode, lnb);
+                
+                rc = filter_alloc_dio_page(obd, inode, lnb);
                 if (rc) {
                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                              "page err %u@"LPU64" %u/%u %p: rc %d\n",
@@ -345,33 +340,37 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(now, obd_timeout, "start_page_read");
 
-        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
-                              NULL, NULL, NULL);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
+        if (inode != NULL) {
+                rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+                                      exp, NULL, NULL, NULL);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
 
+        lprocfs_counter_add(obd->obd_stats,
+                            LPROC_FILTER_READ_BYTES, tot_bytes);
         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
 
         EXIT;
-
 cleanup:
-        if (rc != 0) {
-                filter_free_dio_pages(objcount, obj, niocount, res);
-
-                if (dentry != NULL)
-                        f_dput(dentry);
-                else
-                        CERROR("NULL dentry in cleanup -- tell CFS\n");
+        if (rc) {
+                filter_free_dio_pages(objcount, obj,
+                                      niocount, res);
+                /*
+                 * in other cases (no errors) dentry is released in
+                 * filter_commitrw_read().
+                 */
+                f_dput(dentry);
         }
 
         if (iobuf != NULL)
                 filter_free_iobuf(iobuf);
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
         if (rc)
                 CERROR("io error %d\n", rc);
+        
         return rc;
 }
 
@@ -498,15 +497,17 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct niobuf_local *res,
                                struct obd_trans_info *oti)
 {
+        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        struct niobuf_local *lnb = res;
+        struct dentry *dentry = NULL;
+        unsigned long now = jiffies;
         struct lvfs_run_ctxt saved;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb = res;
         struct fsfilt_objinfo fso;
-        struct dentry *dentry = NULL;
-        void *iobuf; 
+        struct obd_device *obd;
         obd_size left;
-        unsigned long now = jiffies;
-        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        void *iobuf; 
+        
         ENTRY;
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
@@ -518,26 +519,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(cleanup, rc);
         cleanup_phase = 1;
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
-                                  obj->ioo_id);
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        /* make sure that object is already allocated */
+        dentry = filter_crow_object(obd, obj->ioo_gr,
+                                    obj->ioo_id);
+
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
-        
+
         cleanup_phase = 2;
-        
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                       obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
-        }
 
+        /* 
+         * setting attrs passed along with write requests (owner/group). We
+         * goind it here as object should not exist with wrong owner/group as
+         * this may break quotas. --umka
+         */
+        rc = filter_setattr_internal(exp, dentry, oa, NULL);
+        if (rc) {
+                CERROR("cannot set attrs on write, err %d\n",
+                       rc);
+                GOTO(cleanup, rc);
+        }
+        
         fso.fso_dentry = dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
         fsfilt_check_slow(now, obd_timeout, "preprw_write setup");
 
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
+        spin_lock(&obd->obd_osfs_lock);
         if (oa)
                 filter_grant_incoming(exp, oa);
         
@@ -554,7 +565,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
          * o_valid here. */
         oa->o_valid = 0;
 
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
+        spin_unlock(&obd->obd_osfs_lock);
 
         if (rc) 
                 GOTO(cleanup, rc);
@@ -569,7 +580,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
+                rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb);
                 if (rc) {
                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
                                lnb->len, lnb->offset,
@@ -586,8 +597,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                  * asked to read unmapped blocks -- brw_kiovec() does this. */
                 if (lnb->len != PAGE_SIZE) {
                         if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
-                                filter_iobuf_add_page(exp->exp_obd, iobuf,
-                                                      dentry->d_inode,
+                                filter_iobuf_add_page(obd, iobuf, dentry->d_inode,
                                                       lnb->page);
                         } else {
                                 memset(kmap(lnb->page) + lnb->len, 0,
@@ -604,7 +614,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         
         fsfilt_check_slow(now, obd_timeout, "start_page_write");
 
-        lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
         EXIT;
 cleanup:
@@ -613,18 +623,18 @@ cleanup:
                 if (rc)
                         filter_free_dio_pages(objcount, obj, niocount, res);
         case 3:
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 filter_free_iobuf(iobuf);
         case 2:
-                if (rc)
+                if (rc && dentry && !IS_ERR(dentry))
                         f_dput(dentry);
                 break;
         case 1:
-                spin_lock(&exp->exp_obd->obd_osfs_lock);
+                spin_lock(&obd->obd_osfs_lock);
                 if (oa)
                         filter_grant_incoming(exp, oa);
-                spin_unlock(&exp->exp_obd->obd_osfs_lock);
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                spin_unlock(&obd->obd_osfs_lock);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 filter_free_iobuf(iobuf);
                 break;
         default:;