Whamcloud - gitweb
b=6817
authoralex <alex>
Sun, 10 Jul 2005 19:52:22 +0000 (19:52 +0000)
committeralex <alex>
Sun, 10 Jul 2005 19:52:22 +0000 (19:52 +0000)
 - b_size_on_mds landed (prototype for CMD2 w/o recovery support on OST)

13 files changed:
lustre/include/linux/lustre_compat25.h
lustre/include/linux/lustre_lite.h
lustre/kernel_patches/series/2.6-fc3.series
lustre/llite/file.c
lustre/llite/namei.c
lustre/mdc/mdc_lib.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io_26.c
lustre/obdfilter/filter_log.c

index a71a848..0deeaea 100644 (file)
@@ -106,6 +106,8 @@ static inline int cleanup_group_info(void)
 
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)
 
+#define LUSTRE_FILTERDATA(inode)        ((inode)->i_pipe)
+
 /* New (actually old) intent naming */
 #define lookup_intent open_intent
 
@@ -190,6 +192,8 @@ static inline void lustre_daemonize_helper(void)
 
 #else /* 2.4.. */
 
+#error "first, find storage for filterdata -bzzz"
+
 #define ll_vfs_create(a,b,c,d)              vfs_create(a,b,c)
 #define ll_permission(inode,mask,nd)        permission(inode,mask)
 #define ILOOKUP(sb, ino, test, data)        ilookup4(sb, ino, test, data);
index cb6753f..afa7475 100644 (file)
@@ -124,6 +124,9 @@ static inline struct ll_inode_info *ll_i2info(struct inode *inode)
 #endif
 }
 
+#define LLI_HAVE_FLSIZE(inode)  \
+        test_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &ll_i2info(inode)->lli_flags)
+
 /* lprocfs.c */
 enum {
          LPROC_LL_DIRTY_HITS = 0,
index 4ba55c9..4a6afe0 100644 (file)
@@ -28,4 +28,5 @@ linux-2.6.10-fc3-lkcd.patch
 vfs-intent_release_umount-vanilla-2.6.10-fc3.patch
 vfs-umount_lustre-vanilla-2.6.10-fc3.patch
 export-show_task-2.6-vanilla.patch
+deadlock-monitor-2.6.10-fc3.patch
 highmem-split-2.6.10-fc3.patch
index bed62c2..8ac42b3 100644 (file)
 #include "llite_internal.h"
 #include <linux/obd_lov.h>
 
+__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
+__u64 lov_merge_blocks(struct lov_stripe_md *lsm);
+__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
+
+int ll_validate_size(struct inode *inode, __u64 *size, __u64 *blocks)
+{
+        ldlm_policy_data_t extent = { .l_extent = { 0, OBD_OBJECT_EOF } };
+        struct obd_export *exp = ll_i2sbi(inode)->ll_dt_exp;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lustre_handle match_lockh = {0};
+        int rc, flags;
+        ENTRY;
+
+        if (lli->lli_smd == NULL)
+                RETURN(0);
+
+        LASSERT(size != NULL && blocks != NULL);
+
+        flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+        rc = obd_match(exp, lli->lli_smd, LDLM_EXTENT, &extent,
+                       LCK_PR | LCK_PW, &flags, inode, &match_lockh);
+        if (rc == 0) {
+                /* we have no all needed locks,
+                 * so we don't know actual size */
+                GOTO(finish, rc);
+        }
+
+        /* we know actual size! */
+        down(&lli->lli_size_sem);
+        *size = lov_merge_size(lli->lli_smd, 0);
+        *blocks = lov_merge_blocks(lli->lli_smd);
+        up(&lli->lli_size_sem);
+
+finish:
+        RETURN(rc);
+}
+
 int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
                     struct obd_client_handle *och)
 {
@@ -67,7 +104,6 @@ int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
         obdo->o_id = inode->i_ino;
         obdo->o_valid = OBD_MD_FLID;
         obdo_from_inode(obdo, inode, (OBD_MD_FLTYPE | OBD_MD_FLMODE |
-                                      OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                       OBD_MD_FLATIME | OBD_MD_FLMTIME |
                                       OBD_MD_FLCTIME));
         if (0 /* ll_is_inode_dirty(inode) */) {
@@ -76,6 +112,14 @@ int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
         }
         obdo->o_fid = id_fid(&ll_i2info(inode)->lli_id);
         obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
+
+
+        obdo->o_valid |= OBD_MD_FLEPOCH;
+        obdo->o_easize = ll_i2info(inode)->lli_io_epoch;
+
+        if (ll_validate_size(inode, &obdo->o_size, &obdo->o_blocks))
+                obdo->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+
         rc = md_close(md_exp, obdo, och, &req);
         obdo_free(obdo);
 
@@ -892,10 +936,6 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
         return rc;
 }
 
-__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
-__u64 lov_merge_blocks(struct lov_stripe_md *lsm);
-__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
-
 /* NB: lov_merge_size will prefer locally cached writes if they extend the
  * file (because it prefers KMS over RSS when larger) */
 int ll_glimpse_size(struct inode *inode)
@@ -1701,15 +1741,15 @@ int ll_inode_revalidate_it(struct dentry *dentry)
 
         ll_lookup_finish_locks(&oit, dentry);
 
-        lsm = lli->lli_smd;
-        if (lsm == NULL) /* object not yet allocated, don't validate size */
-                GOTO(out, rc = 0);
-
-        /*
-         * ll_glimpse_size() will prefer locally cached writes if they extend
-         * the file.
-         */
-        rc = ll_glimpse_size(inode);
+        if (!LLI_HAVE_FLSIZE(inode)) {
+                /* if object not yet allocated, don't validate size */
+                lsm = lli->lli_smd;
+                if (lsm != NULL) {
+                        /* ll_glimpse_size() will prefer locally cached
+                         * writes if they extend the file */
+                        rc = ll_glimpse_size(inode);
+                }
+        }
         EXIT;
 out:
         ll_intent_release(&oit);
index c329318..a0c8916 100644 (file)
@@ -198,9 +198,13 @@ int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                         ll_md_real_close(ll_i2mdexp(inode), inode, flags);
                 }
 
-                if (bits & MDS_INODELOCK_UPDATE)
+                if ((bits & MDS_INODELOCK_UPDATE) && LLI_HAVE_FLSIZE(inode)) {
+                        CDEBUG(D_OTHER, "isize for %lu/%u(%p) from mds "
+                               "is not actual\n", inode->i_ino,
+                               inode->i_generation, inode);
                         clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
                                   &(ll_i2info(inode)->lli_flags));
+                }
 
 
                 /* If lookup lock is cancelled, we just drop the dentry and
@@ -314,10 +318,13 @@ static int lookup_it_finish(struct ptlrpc_request *request, int offset,
                         /* bug 2334: drop MDS lock before acquiring OST lock */
                         ll_intent_drop_lock(it);
 
-                        rc = ll_glimpse_size(inode);
-                        if (rc) {
-                                iput(inode);
-                                RETURN(rc);
+                        if (!LLI_HAVE_FLSIZE(inode)) {
+                                CDEBUG(D_INODE, "retrieve size from OSS\n");
+                                rc = ll_glimpse_size(inode);
+                                if (rc) {
+                                        iput(inode);
+                                        RETURN(rc);
+                                }
                         }
                 }
 
index 895b2a5..5a9e601 100644 (file)
@@ -151,6 +151,10 @@ void mdc_close_pack(struct ptlrpc_request *req, int offset, struct obdo *oa,
                 body->flags = oa->o_flags;
                 body->valid |= OBD_MD_FLFLAGS;
         }
+        if (oa->o_valid & OBD_MD_FLEPOCH) {
+                body->io_epoch = oa->o_easize;
+                body->valid |= OBD_MD_FLEPOCH;
+        }
 }
 
 /* 
index 49c5db0..54bdddb 100644 (file)
@@ -1306,6 +1306,36 @@ int mds_check_mds_num(struct obd_device *obd, struct inode *inode,
         RETURN(rc);
 }
 
+int mds_getattr_size(struct obd_device *obd, struct dentry *dentry,
+                     struct ptlrpc_request *req, struct mds_body *body)
+{
+        struct inode *inode = dentry->d_inode;
+        ENTRY;
+
+        LASSERT(body != NULL);
+
+        if (dentry->d_inode == NULL || !S_ISREG(inode->i_mode))
+                RETURN(0);
+        
+        if (obd->obd_recovering) {
+                CDEBUG(D_ERROR, "size for "DLID4" is unknown yet (recovering)\n",
+                       OLID4(&body->id1));
+                RETURN(0);
+        }
+
+        if (atomic_read(&inode->i_writecount)) {
+                /* some one has opened the file for write.
+                 * mds doesn't know actual size */
+                CDEBUG(D_OTHER, "MDS doesn't know actual size for "DLID4"\n",
+                       OLID4(&body->id1));
+                RETURN(0);
+        }
+        CDEBUG(D_OTHER, "MDS returns "LPD64"/"LPD64" for"DLID4"\n",
+               body->size, body->blocks, OLID4(&body->id1));
+        body->valid |= OBD_MD_FLSIZE;
+        RETURN(0);
+}
+
 static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                             struct lustre_handle *child_lockh, int child_part)
 {
@@ -1499,8 +1529,16 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                 }
         }
 
-        rc = mds_getattr_internal(obd, dchild, req, offset, body, reply_offset);        
-        GOTO(cleanup, rc); /* returns the lock to the client */
+        rc = mds_getattr_internal(obd, dchild, req, offset, body, reply_offset);
+        if (rc)
+                GOTO(cleanup, rc); /* returns the lock to the client */
+
+        /* probably MDS knows actual size? */
+        body = lustre_msg_buf(req->rq_repmsg, reply_offset, sizeof(*body));
+        LASSERT(body != NULL);
+        mds_getattr_size(obd, dchild, req, body);
+
+        GOTO(cleanup, rc);
 
  cleanup:
         switch (cleanup_phase) {
index 796b70a..167f674 100644 (file)
@@ -15,7 +15,7 @@ struct mds_filter_data {
         __u64 io_epoch;
 };
 
-#define MDS_FILTERDATA(inode) ((struct mds_filter_data *)(inode)->i_filterdata)
+#define MDS_FILTERDATA(inode) ((struct mds_filter_data *)LUSTRE_FILTERDATA(inode))
 #define DENTRY_VALID(dentry)    \
         ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
 
@@ -183,6 +183,8 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md *ea, struct obd_trans_info *oti);
 
 /* mds/handler.c */
+int mds_getattr_size(struct obd_device *obd, struct dentry *dentry,
+                     struct ptlrpc_request *req, struct mds_body *body);
 int mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
                     ptl_nid_t *peernid);
 int mds_handle(struct ptlrpc_request *req);
index 7dd38da..8072ef3 100644 (file)
@@ -111,13 +111,12 @@ static void mds_mfd_destroy(struct mds_file_data *mfd)
         mds_mfd_put(mfd);
 }
 
-#ifdef IFILTERDATA_ACTUALLY_USED
 /* Caller must hold mds->mds_epoch_sem */
 static int mds_alloc_filterdata(struct inode *inode)
 {
-        LASSERT(inode->i_filterdata == NULL);
-        OBD_ALLOC(inode->i_filterdata, sizeof(struct mds_filter_data));
-        if (inode->i_filterdata == NULL)
+        LASSERT(LUSTRE_FILTERDATA(inode) == NULL);
+        OBD_ALLOC(LUSTRE_FILTERDATA(inode), sizeof(struct mds_filter_data));
+        if (LUSTRE_FILTERDATA(inode) == NULL)
                 return -ENOMEM;
         LASSERT(igrab(inode) == inode);
         return 0;
@@ -126,12 +125,11 @@ static int mds_alloc_filterdata(struct inode *inode)
 /* Caller must hold mds->mds_epoch_sem */
 static void mds_free_filterdata(struct inode *inode)
 {
-        LASSERT(inode->i_filterdata != NULL);
-        OBD_FREE(inode->i_filterdata, sizeof(struct mds_filter_data));
-        inode->i_filterdata = NULL;
+        LASSERT(LUSTRE_FILTERDATA(inode) != NULL);
+        OBD_FREE(LUSTRE_FILTERDATA(inode), sizeof(struct mds_filter_data));
+        LUSTRE_FILTERDATA(inode) = NULL;
         iput(inode);
 }
-#endif /*IFILTERDATA_ACTUALLY_USED*/
 
 /* Write access to a file: executors cause a negative count,
  * writers a positive count.  The semaphore is needed to perform
@@ -156,7 +154,6 @@ static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
                 RETURN(-ETXTBSY);
         }
 
-#ifdef IFILTERDATA_ACTUALLY_USED
         if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) {
                 CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n",
                        MDS_FILTERDATA(inode)->io_epoch, inode->i_ino,
@@ -164,23 +161,20 @@ static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
                 goto out;
         }
 
-        if (inode->i_filterdata == NULL)
+        if (MDS_FILTERDATA(inode) == NULL)
                 mds_alloc_filterdata(inode);
-        if (inode->i_filterdata == NULL) {
+        if (MDS_FILTERDATA(inode) == NULL) {
                 rc = -ENOMEM;
                 goto out;
         }
-#endif /*IFILTERDATA_ACTUALLY_USED*/
         if (epoch > mds->mds_io_epoch)
                 mds->mds_io_epoch = epoch;
         else
                 mds->mds_io_epoch++;
-#ifdef IFILTERDATA_ACTUALLY_USED
         MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch;
         CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n",
                mds->mds_io_epoch, inode->i_ino, inode->i_generation);
  out:
-#endif /*IFILTERDATA_ACTUALLY_USED*/
         if (rc == 0)
                 atomic_inc(&inode->i_writecount);
         up(&mds->mds_epoch_sem);
@@ -205,9 +199,7 @@ static int mds_put_write_access(struct mds_obd *mds, struct inode *inode,
         if (!unlinking && !(body->valid & OBD_MD_FLSIZE))
                 GOTO(out, rc = EAGAIN);
 #endif
-#ifdef IFILTERDATA_ACTUALLY_USED
         mds_free_filterdata(inode);
-#endif
  out:
         up(&mds->mds_epoch_sem);
         return rc;
@@ -263,9 +255,7 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
                 rc = mds_get_write_access(mds, dentry->d_inode, 0);
                 if (rc)
                         GOTO(cleanup_mfd, rc);
-#ifdef IFILTERDATA_ACTUALLY_USED
                 body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
-#endif
         } else if (flags & FMODE_EXEC) {
                 rc = mds_deny_write_access(mds, dentry->d_inode);
                 if (rc)
@@ -274,6 +264,8 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
 
         dget(dentry);
 
+        /* FIXME: invalidate update locks when first open for write comes in */
+
         /* mark the file as open to handle open-unlink. */
         DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode);
         mds_orphan_open_inc(dentry->d_inode);
@@ -1346,6 +1338,34 @@ got_child:
         /* Step 5: mds_open it */
         rc = mds_finish_open(req, dchild, body, rec->ur_flags, &handle,
                              rec, rep);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        /* if this is a writer, we have to invalidate client's
+         * update locks in order to make sure they don't use
+         * isize/iblocks from mds anymore.
+         * FIXME: can cause a deadlock, use mds_get_parent_child_locked()
+         * XXX: optimization is to do this for first writer only */
+        if (accmode(rec->ur_flags) & MAY_WRITE) {
+                struct ldlm_res_id child_res_id = { .name = {0}};
+                ldlm_policy_data_t sz_policy;
+                struct lustre_handle sz_lockh;
+                int lock_flags = 0;
+
+                child_res_id.name[0] = id_fid(&body->id1);
+                child_res_id.name[1] = id_group(&body->id1);
+                sz_policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                      child_res_id, LDLM_IBITS, &sz_policy,
+                                      LCK_PW, &lock_flags, mds_blocking_ast,
+                                      ldlm_completion_ast, NULL, NULL, NULL,
+                                      0, NULL, &sz_lockh);
+                if (rc == ELDLM_OK)
+                        ldlm_lock_decref(&sz_lockh, LCK_PW);
+                else
+                        CERROR("can't invalidate client's update locks\n");
+        }
 
        EXIT;
 cleanup:
@@ -1529,6 +1549,17 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset,
                }
 
                 goto out; /* Don't bother updating attrs on unlinked inode */
+        } else if ((mfd->mfd_mode & FMODE_WRITE) && rc == 0 && request_body) {
+                /* last writer closed file - let's update i_size/i_blocks */
+                if (request_body->valid & OBD_MD_FLSIZE) {
+                        LASSERT(request_body->valid & OBD_MD_FLBLOCKS);
+                        CDEBUG(D_OTHER, "update size "LPD64" for "DLID4
+                               ", epoch "LPD64"\n", inode->i_size,
+                               OLID4(&request_body->id1),
+                               request_body->io_epoch);
+                        iattr.ia_size = inode->i_size;
+                        iattr.ia_valid |= ATTR_SIZE;
+                }
         }
 
 #if 0
@@ -1577,7 +1608,8 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset,
         }
 
         if (iattr.ia_valid != 0) {
-                handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
+                if (handle == NULL)
+                        handle = fsfilt_start(obd,inode,FSFILT_OP_SETATTR,NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 rc = fsfilt_setattr(obd, mfd->mfd_dentry, handle, &iattr, 0);
@@ -1621,6 +1653,142 @@ out:
         RETURN(rc);
 }
 
+static int mds_extent_lock_callback(struct ldlm_lock *lock,
+                                    struct ldlm_lock_desc *new, void *data,
+                                    int flag)
+{
+        struct lustre_handle lockh = { 0 };
+        int rc;
+        ENTRY;
+
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_CANCELING: {
+                break;
+        }
+        default:
+                LBUG();
+        }
+
+        RETURN(0);
+}
+__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
+__u64 lov_merge_blocks(struct lov_stripe_md *lsm);
+
+int mds_validate_size(struct obd_device *obd, struct mds_body *body,
+                      struct mds_file_data *mfd)
+{
+        ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
+        struct inode *inode = mfd->mfd_dentry->d_inode;
+        struct lustre_handle lockh = { 0 };
+        struct lov_stripe_md *lsm = NULL;
+        int rc, len, flags;
+        void *lmm = NULL;
+        ENTRY;
+
+        /* we update i_size/i_blocks only for regular files */
+        if (!S_ISREG(inode->i_mode))
+                RETURN(0);
+
+        /* we update i_size/i_blocks only for writers */
+        if (!(mfd->mfd_mode & FMODE_WRITE))
+                RETURN(0);
+
+        /* we like when client reports actual i_size/i_blocks himself */
+        if (body->valid & OBD_MD_FLSIZE) {
+                LASSERT(body->valid & OBD_MD_FLBLOCKS);
+                CDEBUG(D_OTHER, "client reports "LPD64"/"LPD64" for "DLID4"\n",
+                       body->size, body->blocks, OLID4(&body->id1));
+                RETURN(0);
+        }
+
+        /* we shouldn't fetch size from OSTes during recovery - deadlock */
+        if (obd->obd_recovering)
+                RETURN(0);
+
+        DOWN_READ_I_ALLOC_SEM(inode);
+        if (atomic_read(&inode->i_writecount) > 1 
+                        || mds_inode_is_orphan(inode)) {
+                /* there is no need to update i_size/i_blocks on orphans.
+                 * also, if this is not last writer, then it doesn't make
+                 * sense to fetch i_size/i_blocks from OSSes */
+                UP_READ_I_ALLOC_SEM(inode);
+                RETURN(0);
+        }
+        UP_READ_I_ALLOC_SEM(inode);
+
+        /* 1: client didn't send actual i_size/i_blocks
+         * 2: we seem to be last writer
+         * 3: the file doesn't look like to be disappeared
+         * conclusion: we're gonna fetch them from OSSes */
+
+        down(&inode->i_sem);
+        len = fsfilt_get_md(obd, inode, NULL, 0, EA_LOV);
+        up(&inode->i_sem);
+        
+        if (len < 0) {
+                CERROR("error getting inode %lu MD: %d\n", inode->i_ino, len);
+                GOTO(cleanup, rc = len);
+        } else if (len == 0) {
+                CDEBUG(D_INODE, "no LOV in inode %lu\n", inode->i_ino);
+                GOTO(cleanup, rc = 0);
+        }
+
+        OBD_ALLOC(lmm, len);
+        if (lmm == NULL) {
+                CERROR("can't allocate memory\n");
+                GOTO(cleanup, rc = -ENOMEM);
+        }
+
+        down(&inode->i_sem);
+        rc = fsfilt_get_md(obd, inode, lmm, len, EA_LOV);
+        up(&inode->i_sem);
+        
+        if (rc < 0) {
+                CERROR("error getting inode %lu MD: %d\n", inode->i_ino, rc);
+                GOTO(cleanup, rc);
+        }
+
+        rc = obd_unpackmd(obd->u.mds.mds_dt_exp, &lsm, lmm, len);
+        if (rc < 0) {
+                CERROR("error getting inode %lu MD: %d\n", inode->i_ino, rc);
+                GOTO(cleanup, rc);
+        }
+        
+        CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", inode->i_ino);
+        
+        flags = LDLM_FL_HAS_INTENT;
+        rc = obd_enqueue(obd->u.mds.mds_dt_exp, lsm, LDLM_EXTENT, &policy,
+                         LCK_PR, &flags, mds_extent_lock_callback,
+                         ldlm_completion_ast, NULL, NULL,
+                         sizeof(struct ost_lvb), lustre_swab_ost_lvb, &lockh);
+        if (rc != 0) {
+                CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        body->size = lov_merge_size(lsm, 0);
+        body->blocks = lov_merge_blocks(lsm);
+        body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+
+        CDEBUG(D_OTHER, "LOV reports "LPD64"/"LPD64" for "DLID4"\n",
+                        body->size, body->blocks, OLID4(&body->id1));
+
+        obd_cancel(obd->u.mds.mds_dt_exp, lsm, LCK_PR, &lockh);
+        
+cleanup:
+        if (lsm != NULL)
+                obd_free_memmd(obd->u.mds.mds_dt_exp, &lsm);
+        if (lmm != NULL)
+                OBD_FREE(lmm, len);
+        RETURN(rc);
+}
+
 int mds_close(struct ptlrpc_request *req, int offset)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
@@ -1669,7 +1837,21 @@ int mds_close(struct ptlrpc_request *req, int offset)
                 RETURN(-ESTALE);
         }
 
+        rc = mds_validate_size(obd, body, mfd);
+        LASSERT(rc == 0);
+
         inode = mfd->mfd_dentry->d_inode;
+
+        if (mfd->mfd_mode & FMODE_WRITE) {
+                /* we set i_size/i_blocks here, nobody will see
+                 * them until all write references are dropped.
+                 * btw, we hold one reference */
+                if (body->valid & OBD_MD_FLSIZE)
+                        i_size_write(inode, body->size);
+                if (body->valid & OBD_MD_FLBLOCKS)
+                        inode->i_blocks = body->blocks;
+        }
+
         /* child i_alloc_sem protects orphan_dec_test && is_orphan race */
         DOWN_WRITE_I_ALLOC_SEM(inode); /* mds_mfd_close drops this */
         if (mds_inode_is_orphan(inode) && mds_orphan_open_count(inode) == 1) {
index 8ec9afe..77123c8 100644 (file)
@@ -2997,8 +2997,13 @@ static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         ctxt = llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT);
         ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
 
-        /* FIXME - count should be 1 to setup size log */
-        rc = obd_llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, 
+        filter_size_orig_logops = llog_lvfs_ops;
+#if 0
+        filter_size_orig_logops.lop_setup = llog_obd_origin_setup;
+        filter_size_orig_logops.lop_cleanup = llog_catalog_cleanup;
+        filter_size_orig_logops.lop_add = llog_catalog_add;
+#endif
+        rc = obd_llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 1, 
                             &catid->lci_logid, &filter_size_orig_logops);
         RETURN(rc);
 }
index 9874000..c9545d9 100644 (file)
@@ -171,10 +171,11 @@ struct ost_filterdata {
         __u32  ofd_epoch;
 };
 
-int filter_log_sz_change(struct llog_handle *cathandle,
+int filter_log_sz_change(struct obd_device *obd,
                          struct lustre_id *id, __u32 io_epoch,
-                         struct llog_cookie *logcookie,
+                         struct llog_cookie *logcookie, 
                          struct inode *inode);
+
 //int filter_get_catalog(struct obd_device *);
 void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                               void *cb_data, int error);
@@ -203,5 +204,4 @@ static inline void  filter_tally_read(struct filter_obd *filter,
 static inline lproc_filter_attach_seqstat(struct obd_device *dev) {}
 #endif
 
-
 #endif
index 5fc41e3..9ced565 100644 (file)
@@ -444,6 +444,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
         int   total_size = 0;
+        loff_t old_size;
         ENTRY;
 
         LASSERT(oti != NULL);
@@ -496,6 +497,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         cleanup_phase = 2;
 
         down(&inode->i_sem);
+        old_size = inode->i_size;
         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
                                            oti);
         if (IS_ERR(oti->oti_handle)) {
@@ -514,6 +516,15 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         /* filter_direct_io drops i_sem */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
                               oti, NULL);
+
+#if 0
+        if (inode->i_size != old_size) {
+                struct llog_cookie *cookie = obdo_logcookie(oa);
+                struct lustre_id *id = obdo_id(oa);
+                filter_log_sz_change(obd, id, oa->o_easize, cookie, inode);
+        }
+#endif
+
         if (rc == 0)
                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
 
index 9d7afe9..3fd9bc8 100644 (file)
 
 #include "filter_internal.h"
 
-int filter_log_sz_change(struct llog_handle *cathandle, 
+int filter_log_sz_change(struct obd_device *obd,
                          struct lustre_id *id, __u32 io_epoch,
                          struct llog_cookie *logcookie, 
                          struct inode *inode)
 {
         struct llog_size_change_rec *lsc;
-#ifdef IFILTERDATA_ACTUALLY_USED
         struct ost_filterdata *ofd;
-#endif
+        struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
         down(&inode->i_sem);
-#ifdef IFILTERDATA_ACTUALLY_USED
-        ofd = inode->i_filterdata;
+        ofd = (struct ost_filterdata *) LUSTRE_FILTERDATA(inode);
         
         if (ofd && ofd->ofd_epoch >= io_epoch) {
                 if (ofd->ofd_epoch > io_epoch)
@@ -68,10 +66,9 @@ int filter_log_sz_change(struct llog_handle *cathandle,
                 if (!ofd)
                         GOTO(out, rc = -ENOMEM);
                 igrab(inode);
-                inode->i_filterdata = ofd;
+                LUSTRE_FILTERDATA(inode) = (void *) ofd;
                 ofd->ofd_epoch = io_epoch;
         }
-#endif
         /* the decision to write a record is now made, unlock */
         up(&inode->i_sem);
 
@@ -83,8 +80,11 @@ int filter_log_sz_change(struct llog_handle *cathandle,
         lsc->lsc_id = *id;
         lsc->lsc_io_epoch = io_epoch;
 
-        rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie,
-                              NULL, NULL, NULL);
+        CDEBUG(D_ERROR, "new epoch %lu for "DLID4"\n",
+               (unsigned long) io_epoch, OLID4(id));
+
+        ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_ORIG_CTXT);
+        rc = llog_add(ctxt, &lsc->lsc_hdr, NULL, logcookie, 1, NULL,NULL,NULL);
         OBD_FREE(lsc, sizeof(*lsc));
 
         if (rc > 0) {
@@ -92,11 +92,10 @@ int filter_log_sz_change(struct llog_handle *cathandle,
                 rc = 0;
         }
 
-#ifdef IFILTERDATA_ACTUALLY_USED
 out:
-#endif
         RETURN(rc);
 }
+
 struct obd_llogs * filter_grab_llog_for_group(struct obd_device *,
                                               int, struct obd_export *);