Whamcloud - gitweb
b=13099
authorqian <qian>
Sat, 26 Jan 2008 07:46:20 +0000 (07:46 +0000)
committerqian <qian>
Sat, 26 Jan 2008 07:46:20 +0000 (07:46 +0000)
i=adilger,deen,shadow

fix the directIO path: read hitting the hole or beyong the end of file.
Submit I/O to various oscs in one RPC set.

lustre/include/lustre_net.h
lustre/include/obd_class.h
lustre/llite/rw26.c
lustre/lov/lov_obd.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_module.c
lustre/tests/directio.c
lustre/tests/sanity.sh

index 9e1a929..4f5bc8a 100644 (file)
@@ -205,16 +205,21 @@ struct ptlrpc_request_set {
         cfs_waitq_t       set_waitq;
         cfs_waitq_t      *set_wakeup_ptr;
         struct list_head  set_requests;
+        struct list_head  set_cblist; /* list of completion callbacks */
         set_interpreter_func    set_interpret; /* completion callback */
         void              *set_arg; /* completion context */
-        void              *set_countp; /* pointer to NOB counter in case 
-                                        * of directIO (bug11737) */
         /* locked so that any old caller can communicate requests to
          * the set holder who can then fold them into the lock-free set */
         spinlock_t        set_new_req_lock;
         struct list_head  set_new_requests;
 };
 
+struct ptlrpc_set_cbdata {
+        struct list_head        psc_item;
+        set_interpreter_func    psc_interpret;
+        void                   *psc_data;
+};
+
 struct ptlrpc_bulk_desc;
 
 /*
@@ -743,6 +748,8 @@ void ptlrpc_restart_req(struct ptlrpc_request *req);
 void ptlrpc_abort_inflight(struct obd_import *imp);
 
 struct ptlrpc_request_set *ptlrpc_prep_set(void);
+int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
+                      set_interpreter_func fn, void *data);
 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
 int ptlrpc_check_set(struct ptlrpc_request_set *set);
 int ptlrpc_set_wait(struct ptlrpc_request_set *);
index 46b1196..3f111f9 100644 (file)
@@ -1173,15 +1173,12 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp,
 {
         struct ptlrpc_request_set *set = NULL;
         struct obd_info oinfo = { { { 0 } } };
-        atomic_t nob;
         int rc = 0;
         ENTRY;
 
         set =  ptlrpc_prep_set();
         if (set == NULL)
                 RETURN(-ENOMEM);
-        atomic_set(&nob, 0);
-        set->set_countp = &nob;
 
         oinfo.oi_oa = oa;
         oinfo.oi_md = lsm;
@@ -1191,8 +1188,6 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp,
                 rc = ptlrpc_set_wait(set);
                 if (rc)
                         CERROR("error from callback: rc = %d\n", rc);
-                else
-                        rc = atomic_read(&nob);
         } else {
                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                        "error from obd_brw_async: rc = %d\n", rc);
index 85450c1..d2e625c 100644 (file)
@@ -145,16 +145,14 @@ static void ll_free_user_pages(struct page **pages, int npages, int do_dirty)
 
 static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode,
                                    struct address_space *mapping,
-                                   struct lov_stripe_md *lsm,
+                                   struct obd_info *oinfo,
+                                   struct ptlrpc_request_set *set,
                                    size_t size, loff_t file_offset,
                                    struct page **pages, int page_count)
 {
         struct brw_page *pga;
-        struct obdo oa;
-        int opc, i, rc = 0;
+        int i, rc = 0;
         size_t length;
-        struct obd_capa *ocapa;
-        loff_t file_offset_orig = file_offset;
         ENTRY;
 
         OBD_ALLOC(pga, sizeof(*pga) * page_count);
@@ -177,28 +175,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode,
                         POISON_PAGE(pages[i], 0x0d);
         }
 
-        ll_inode_fill_obdo(inode, rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, &oa);
-
-        if (rw == WRITE) {
-                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_DIRECT_WRITE, size);
-                opc = CAPA_OPC_OSS_WRITE;
-                llap_write_pending(inode, NULL);
-        } else {
-                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-                                    LPROC_LL_DIRECT_READ, size);
-                opc = CAPA_OPC_OSS_RW;
-        }
-        ocapa = ll_osscapa_get(inode, opc);
-        rc = obd_brw_rqset(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
-                           ll_i2dtexp(inode), &oa, lsm, page_count, pga, NULL,
-                           ocapa);
-        capa_put(ocapa);
-        if ((rc > 0) && (rw == WRITE)) {
-                lov_stripe_lock(lsm);
-                obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset_orig + rc, 0);
-                lov_stripe_unlock(lsm);
-        }
+        rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
+                                ll_i2dtexp(inode), oinfo, page_count,
+                                pga, NULL, set);
+        if (rc == 0)
+                rc = size;
 
         OBD_FREE(pga, sizeof(*pga) * page_count);
         RETURN(rc);
@@ -218,8 +199,13 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
         struct inode *inode = file->f_mapping->host;
         ssize_t count = iov_length(iov, nr_segs), tot_bytes = 0;
         struct ll_inode_info *lli = ll_i2info(inode);
+        struct lov_stripe_md *lsm = lli->lli_smd;
+        struct ptlrpc_request_set *set;
+        struct obd_info oinfo;
+        struct obdo oa;
         unsigned long seg = 0;
         size_t size = MAX_DIO_SIZE;
+        int opc;
         ENTRY;
 
         if (!lli->lli_smd || !lli->lli_smd->lsm_object_id)
@@ -235,10 +221,14 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                file_offset, file_offset, count >> CFS_PAGE_SHIFT,
                MAX_DIO_SIZE >> CFS_PAGE_SHIFT);
 
-        if (rw == WRITE)
+        if (rw == WRITE) {
                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_WRITE, count);
-        else
+                opc = CAPA_OPC_OSS_WRITE;
+                llap_write_pending(inode, NULL);
+        } else {
                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_READ, count);
+                opc = CAPA_OPC_OSS_RW;
+        }
 
         /* Check that all user buffers are aligned as well */
         for (seg = 0; seg < nr_segs; seg++) {
@@ -247,10 +237,31 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                         RETURN(-EINVAL);
         }
 
+        set = ptlrpc_prep_set();
+        if (set == NULL)
+                RETURN(-ENOMEM);
+
+        ll_inode_fill_obdo(inode, rw, &oa);
+        oinfo.oi_oa = &oa;
+        oinfo.oi_md = lsm;
+        oinfo.oi_capa = ll_osscapa_get(inode, opc);
+
+        /* need locking between buffered and direct access. and race with 
+         *size changing by concurrent truncates and writes. */
+        if (rw == READ)
+                LOCK_INODE_MUTEX(inode);
+
         for (seg = 0; seg < nr_segs; seg++) {
                 size_t iov_left = iov[seg].iov_len;
                 unsigned long user_addr = (unsigned long)iov[seg].iov_base;
 
+                if (rw == READ) {
+                        if (file_offset >= inode->i_size)
+                                break;
+                        if (file_offset + iov_left > inode->i_size)
+                                iov_left = inode->i_size - file_offset;
+                }
+
                 while (iov_left > 0) {
                         struct page **pages;
                         int page_count;
@@ -263,7 +274,7 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                         if (page_count > 0) {
                                 result = ll_direct_IO_26_seg(rw, inode,
                                                              file->f_mapping,
-                                                             lli->lli_smd,
+                                                             &oinfo, set,
                                                              min(size,iov_left),
                                                              file_offset, pages,
                                                              page_count);
@@ -288,9 +299,9 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                                         continue;
                                 }
 
-                                if (tot_bytes > 0)
-                                        RETURN(tot_bytes);
-                                RETURN(page_count < 0 ? page_count : result);
+                                if (tot_bytes <= 0)
+                                        tot_bytes = page_count < 0 ? page_count : result;
+                                GOTO(out, tot_bytes);
                         }
 
                         tot_bytes += result;
@@ -299,6 +310,25 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                         user_addr += result;
                 }
         }
+out:
+        if (rw == READ)
+                UNLOCK_INODE_MUTEX(inode);
+
+        if (tot_bytes > 0) {
+                int rc;
+                
+                rc = ptlrpc_set_wait(set);
+                if (rc) {
+                        tot_bytes = rc;
+                } else if (rw == WRITE) {
+                        lov_stripe_lock(lsm);
+                        obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
+                        lov_stripe_unlock(lsm);
+                }
+        }
+
+        capa_put(oinfo.oi_capa);
+        ptlrpc_set_destroy(set);
         RETURN(tot_bytes);
 }
 
index 7383ae6..73292c8 100644 (file)
@@ -1593,8 +1593,10 @@ static int lov_brw_async(int cmd, struct obd_export *exp,
         }
         LASSERT(rc == 0);
         LASSERT(set->set_interpret == NULL);
-        set->set_interpret = (set_interpreter_func)lov_brw_interpret;
-        set->set_arg = (void *)lovset;
+        LASSERT(set->set_arg == NULL);
+        rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset);
+        if (rc)
+                GOTO(out, rc);
 
         RETURN(rc);
 out:
index fbbc634..ba98d4e 100644 (file)
@@ -1404,7 +1404,6 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_brw_async_args *aa = data;
         int                        i;
-        int                        nob = rc;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -1413,8 +1412,6 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
                 if (rc == 0)
                         RETURN(0);
         }
-        if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
-                atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
 
         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
index 55a5ee2..f46f9e3 100644 (file)
@@ -436,7 +436,8 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
         set->set_remaining = 0;
         spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
-
+        CFS_INIT_LIST_HEAD(&set->set_cblist);
+        
         RETURN(set);
 }
 
@@ -495,6 +496,22 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
         EXIT;
 }
 
+int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
+                      set_interpreter_func fn, void *data)
+{
+        struct ptlrpc_set_cbdata *cbdata;
+
+        OBD_ALLOC_PTR(cbdata);
+        if (cbdata == NULL)
+                RETURN(-ENOMEM);
+
+        cbdata->psc_interpret = fn;
+        cbdata->psc_data = data;
+        list_add_tail(&cbdata->psc_item, &set->set_cblist);
+
+        RETURN(0);
+}
+
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
@@ -1274,6 +1291,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
                         set->set_interpret;
                 rc = interpreter (set, set->set_arg, rc);
+        } else {
+                struct ptlrpc_set_cbdata *cbdata, *n;
+                int err;
+
+                list_for_each_entry_safe(cbdata, n,
+                                         &set->set_cblist, psc_item) {
+                        list_del_init(&cbdata->psc_item);
+                        err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
+                        if (err && !rc)
+                                rc = err;
+                        OBD_FREE_PTR(cbdata);
+                }
         }
 
         RETURN(rc);
index aa8db8c..768e892 100644 (file)
@@ -174,6 +174,7 @@ EXPORT_SYMBOL(ptlrpc_retain_replayable_request);
 EXPORT_SYMBOL(ptlrpc_next_xid);
 
 EXPORT_SYMBOL(ptlrpc_prep_set);
+EXPORT_SYMBOL(ptlrpc_set_add_cb);
 EXPORT_SYMBOL(ptlrpc_set_add_req);
 EXPORT_SYMBOL(ptlrpc_set_add_new_req);
 EXPORT_SYMBOL(ptlrpc_set_destroy);
index ebcedb2..1108cba 100644 (file)
@@ -23,11 +23,12 @@ int main(int argc, char **argv)
         long len;
         off64_t seek;
         struct stat64 st;
+        char pad = 0xba;
         int action;
         int rc;
 
         if (argc < 5 || argc > 6) {
-                printf("Usage: %s <read/write/rdwr> file seek nr_blocks [blocksize]\n", argv[0]);
+                printf("Usage: %s <read/write/rdwr/readhole> file seek nr_blocks [blocksize]\n", argv[0]);
                 return 1;
         }
 
@@ -37,7 +38,10 @@ int main(int argc, char **argv)
                 action = O_WRONLY;
         else if (!strcmp(argv[1], "rdwr"))
                 action = O_RDWR;
-        else {
+        else if (!strcmp(argv[1], "readhole")) {
+                action = O_RDONLY;
+                pad = 0;
+        } else {
                 printf("Usage: %s <read/write/rdwr> file seek nr_blocks [blocksize]\n", argv[0]);
                 return 1;
         }
@@ -74,7 +78,7 @@ int main(int argc, char **argv)
                 printf("No memory %s\n", strerror(errno));
                 return 1;
         }
-        memset(wbuf, 0xba, len);
+        memset(wbuf, pad, len);
 
         if (action == O_WRONLY || action == O_RDWR) {
                 if (lseek64(fd, seek, SEEK_SET) < 0) {
index 3ddf3bd..a29a855 100644 (file)
@@ -4482,9 +4482,19 @@ test_119b() # bug 11737
         sync
         multiop $DIR/$tfile oO_RDONLY:O_DIRECT:r$((2048 * 1024)) || \
                 error "direct read failed"
+        rm -f $DIR/$tfile
 }
 run_test 119b "Sparse directIO read must return actual read amount"
 
+test_119c() # bug 13099
+{
+        BSIZE=1048576
+        directio write $DIR/$tfile 3 1 $BSIZE || error "direct write failed"
+        directio readhole $DIR/$tfile 0 2 $BSIZE || error "reading hole failed"
+        rm -f $DIR/$tfile
+}
+run_test 119c "Testing for direct read hitting hole"
+
 LDLM_POOL_CTL_RECALC=1
 LDLM_POOL_CTL_SHRINK=2