Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / liblustre / rw.c
index 7a2ac29..9fe16e5 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Lustre Light Super operations
+ * Lustre Light block IO
  *
  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
  *
@@ -29,6 +29,7 @@
 #include <time.h>
 #include <sys/types.h>
 #include <sys/queue.h>
+#include <fcntl.h>
 
 #include <sysio.h>
 #include <fs.h>
@@ -74,13 +75,12 @@ static int llu_extent_lock_callback(struct ldlm_lock *lock,
         struct lustre_handle lockh = { 0 };
         int rc;
         ENTRY;
-        
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
                 LBUG();
         }
-        
+
         switch (flag) {
         case LDLM_CB_BLOCKING:
                 ldlm_lock2handle(lock, &lockh);
@@ -89,12 +89,17 @@ static int llu_extent_lock_callback(struct ldlm_lock *lock,
                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
                 break;
         case LDLM_CB_CANCELING: {
-                struct inode *inode = llu_inode_from_lock(lock);
+                struct inode *inode;
                 struct llu_inode_info *lli;
                 struct lov_stripe_md *lsm;
                 __u32 stripe;
                 __u64 kms;
-                
+
+                /* This lock wasn't granted, don't try to evict pages */
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        RETURN(0);
+
+                inode = llu_inode_from_lock(lock);
                 if (!inode)
                         RETURN(0);
                 lli= llu_i2info(inode);
@@ -118,7 +123,7 @@ iput:
         default:
                 LBUG();
         }
-        
+
         RETURN(0);
 }
 
@@ -126,59 +131,44 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 {
         struct ptlrpc_request *req = reqp;
         struct inode *inode = llu_inode_from_lock(lock);
-        struct obd_export *exp;
         struct llu_inode_info *lli;
         struct ost_lvb *lvb;
-        struct {
-                int stripe_number;
-                __u64 size;
-                struct lov_stripe_md *lsm;
-        } data;
-        __u32 vallen = sizeof(data);
-        int rc, size = sizeof(*lvb);
+        int rc, size = sizeof(*lvb), stripe = 0;
         ENTRY;
 
         if (inode == NULL)
-                RETURN(0);
+                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
         lli = llu_i2info(inode);
         if (lli == NULL)
-                goto iput;
+                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
         if (lli->lli_smd == NULL)
-                goto iput;
-        exp = llu_i2obdexp(inode);
+                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
         /* First, find out which stripe index this lock corresponds to. */
         if (lli->lli_smd->lsm_stripe_count > 1)
-                data.stripe_number = llu_lock_to_stripe_offset(inode, lock);
-        else
-                data.stripe_number = 0;
-
-        data.size = lli->lli_st_size;
-        data.lsm = lli->lli_smd;
-
-        rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe",
-                          &vallen, &data);
-        if (rc != 0) {
-                CERROR("obd_get_info: rc = %d\n", rc);
-                LBUG();
-        }
-
-        LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
-                   lli->lli_st_size, data.stripe_number, data.size);
+                stripe = llu_lock_to_stripe_offset(inode, lock);
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
                 CERROR("lustre_pack_reply: %d\n", rc);
-                goto iput;
+                GOTO(iput, rc);
         }
 
         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
-        lvb->lvb_size = data.size;
-        ptlrpc_reply(req);
+        lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
 
+        LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
+                   lli->lli_st_size, stripe, lvb->lvb_size);
  iput:
         I_RELE(inode);
-        RETURN(0);
+ out:
+        /* These errors are normal races, so we don't want to fill the console
+         * with messages by calling ptlrpc_error() */
+        if (rc == -ELDLM_NO_LOCK_DATA)
+                lustre_pack_reply(req, 0, NULL, NULL);
+
+        req->rq_status = rc;
+        return rc;
 }
 
 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
@@ -191,7 +181,7 @@ int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
         struct llu_inode_info *lli = llu_i2info(inode);
         struct llu_sb_info *sbi = llu_i2sbi(inode);
         ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
-        struct lustre_handle lockh;
+        struct lustre_handle lockh = { 0 };
         int rc, flags = LDLM_FL_HAS_INTENT;
         ENTRY;
 
@@ -287,16 +277,19 @@ int llu_extent_lock_no_validate(struct ll_file_data *fd,
  */
 int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
                     struct lov_stripe_md *lsm, int mode,
-                    struct ldlm_extent *extent, struct lustre_handle *lockh)
+                    struct ldlm_extent *extent, struct lustre_handle *lockh,
+                    int nonblock)
 {
         struct llu_inode_info *lli = llu_i2info(inode);
         struct obd_export *exp = llu_i2obdexp(inode);
         struct ldlm_extent size_lock;
         struct lustre_handle match_lockh = {0};
         int flags, rc, matched;
+        int astflags = nonblock ? LDLM_FL_BLOCK_NOWAIT : 0;
         ENTRY;
 
-        rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent, lockh, 0);
+        rc = llu_extent_lock_no_validate(fd, inode, lsm, mode, extent,
+                                         lockh, astflags);
         if (rc != ELDLM_OK)
                 RETURN(rc);
 
@@ -313,17 +306,18 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
         size_lock.end = OBD_OBJECT_EOF;
 
         /* XXX I bet we should be checking the lock ignore flags.. */
+        /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
                             sizeof(size_lock), LCK_PR, &flags, inode,
                             &match_lockh);
 
-        /* hey, alright, we hold a size lock that covers the size we 
+        /* hey, alright, we hold a size lock that covers the size we
          * just found, its not going to change for a while.. */
         if (matched == 1) {
                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
-        } 
+        }
 
         RETURN(0);
 }
@@ -393,7 +387,7 @@ static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
 }
 
 /* called for each page in a completed rpc.*/
-static void llu_ap_completion(void *data, int cmd, int rc)
+static void llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 {
         struct ll_async_page *llap;
         struct page *page;
@@ -454,21 +448,12 @@ void put_sysio_cookie(struct llu_sysio_cookie *cookie)
         struct lov_stripe_md *lsm = llu_i2info(cookie->lsc_inode)->lli_smd;
         struct obd_export *exp = llu_i2obdexp(cookie->lsc_inode);
         struct ll_async_page *llap = cookie->lsc_llap;
-#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
-        struct page *pages = cookie->lsc_pages;
-#endif
         int i;
 
         for (i = 0; i< cookie->lsc_maxpages; i++) {
                 if (llap[i].llap_cookie)
                         obd_teardown_async_page(exp, lsm, NULL,
                                                 llap[i].llap_cookie);
-#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
-                if (pages[i]._managed) {
-                        free(pages[i].addr);
-                        pages[i]._managed = 0;
-                }
-#endif
         }
 
         I_RELE(cookie->lsc_inode);
@@ -477,85 +462,6 @@ void put_sysio_cookie(struct llu_sysio_cookie *cookie)
         OBD_FREE(cookie, LLU_SYSIO_COOKIE_SIZE(cookie->lsc_maxpages));
 }
 
-#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
-/* Note: these code should be removed finally, don't need
- * more cleanup
- */
-static
-int prepare_unaligned_write(struct llu_sysio_cookie *cookie)
-{
-        struct inode *inode = cookie->lsc_inode;
-        struct llu_inode_info *lli = llu_i2info(inode);
-        struct lov_stripe_md *lsm = lli->lli_smd;
-        struct obdo oa;
-        struct page *pages = cookie->lsc_pages;
-        int i, pgidx[2] = {0, cookie->lsc_npages-1};
-        int rc;
-        ENTRY;
-
-        for (i = 0; i < 2; i++) {
-                struct page *oldpage = &pages[pgidx[i]];
-                struct page newpage;
-                struct brw_page pg;
-                char *newbuf;
-
-                if (i == 0 && pgidx[0] == pgidx[1])
-                        continue;
-
-                LASSERT(oldpage->_offset + oldpage->_count <= PAGE_CACHE_SIZE);
-
-                if (oldpage->_count == PAGE_CACHE_SIZE)
-                        continue;
-
-                if (oldpage->index << PAGE_CACHE_SHIFT >=
-                    lli->lli_st_size)
-                        continue;
-
-                newbuf = malloc(PAGE_CACHE_SIZE);
-                if (!newbuf)
-                        return -ENOMEM;
-
-                newpage.index = oldpage->index;
-                newpage.addr = newbuf;
-
-                pg.pg = &newpage;
-                pg.off = ((obd_off)newpage.index << PAGE_CACHE_SHIFT);
-                if (pg.off + PAGE_CACHE_SIZE > lli->lli_st_size)
-                        pg.count = lli->lli_st_size % PAGE_CACHE_SIZE;
-                else
-                        pg.count = PAGE_CACHE_SIZE;
-                pg.flag = 0;
-
-                oa.o_id = lsm->lsm_object_id;
-                oa.o_mode = lli->lli_st_mode;
-                oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
-
-                /* issue read */
-                rc = obd_brw(OBD_BRW_READ, llu_i2obdexp(inode), &oa, lsm, 1, &pg, NULL);
-                if (rc) {
-                        free(newbuf);
-                        RETURN(rc);
-                }
-
-                /* copy page content, and reset page params */
-                memcpy(newbuf + oldpage->_offset,
-                       (char*)oldpage->addr + oldpage->_offset,
-                       oldpage->_count);
-
-                oldpage->addr = newbuf;
-                if ((((obd_off)oldpage->index << PAGE_CACHE_SHIFT) +
-                    oldpage->_offset + oldpage->_count) > lli->lli_st_size)
-                        oldpage->_count += oldpage->_offset;
-                else
-                        oldpage->_count = PAGE_CACHE_SIZE;
-                oldpage->_offset = 0;
-                oldpage->_managed = 1;
-        }
-
-        RETURN(0);
-}
-#endif
-
 static
 int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
                       char *buf, loff_t pos, size_t count)
@@ -606,14 +512,6 @@ int llu_prep_async_io(struct llu_sysio_cookie *cookie, int cmd,
 
         cookie->lsc_npages = npages;
 
-#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
-        if (cmd == OBD_BRW_WRITE) {
-                rc = prepare_unaligned_write(cookie);
-                if (rc)
-                        RETURN(rc);
-        }
-#endif
-
         for (i = 0; i < npages; i++) {
                 llap[i].llap_magic = LLAP_MAGIC;
                 rc = obd_prep_async_page(exp, lsm, NULL, &pages[i],
@@ -692,6 +590,9 @@ out_cleanup:
         RETURN(ERR_PTR(rc));
 }
 
+void lov_increase_kms(struct obd_export *exp, struct lov_stripe_md *lsm,
+                      obd_off size);
+
 struct llu_sysio_callback_args*
 llu_file_write(struct inode *inode, const struct iovec *iovec,
                size_t iovlen, loff_t pos)
@@ -700,9 +601,12 @@ llu_file_write(struct inode *inode, const struct iovec *iovec,
         struct ll_file_data *fd = lli->lli_file_data;
         struct lustre_handle lockh = {0};
         struct lov_stripe_md *lsm = lli->lli_smd;
+        struct obd_export *exp = NULL;
         ldlm_policy_data_t policy;
         struct llu_sysio_callback_args *lsca;
         struct llu_sysio_cookie *cookie;
+        int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
+                       LDLM_FL_BLOCK_NOWAIT : 0;
         ldlm_error_t err;
         int iovidx;
         ENTRY;
@@ -713,6 +617,10 @@ llu_file_write(struct inode *inode, const struct iovec *iovec,
 
         LASSERT(iovlen <= MAX_IOVEC);
 
+        exp = llu_i2obdexp(inode);
+        if (exp == NULL)
+                RETURN(ERR_PTR(-EINVAL));
+
         OBD_ALLOC(lsca, sizeof(*lsca));
         if (!lsca)
                 RETURN(ERR_PTR(-ENOMEM));
@@ -733,11 +641,11 @@ llu_file_write(struct inode *inode, const struct iovec *iovec,
                 policy.l_extent.end = pos + count - 1;
 
                 err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
-                                      &lockh, 0);
+                                      &lockh, astflag);
                 if (err != ELDLM_OK)
                         GOTO(err_out, err = -ENOLCK);
 
-                CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
+                CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %llu\n",
                        lli->lli_st_ino, count, pos);
 
                 cookie = llu_rw(OBD_BRW_WRITE, inode, buf, count, pos);
@@ -745,12 +653,10 @@ llu_file_write(struct inode *inode, const struct iovec *iovec,
                         /* save cookie */
                         lsca->cookies[lsca->ncookies++] = cookie;
                         pos += count;
-                        /* file size grow. XXX should be done here? */
-                        if (pos > lli->lli_st_size) {
+                        lov_increase_kms(exp, lsm, pos);
+                        /* file size grow */
+                        if (pos > lli->lli_st_size)
                                 lli->lli_st_size = pos;
-                                set_bit(LLI_F_PREFER_EXTENDED_SIZE,
-                                        &lli->lli_flags);
-                        }
                 } else {
                         llu_extent_unlock(fd, inode, lsm, LCK_PW, &lockh);
                         GOTO(err_out, err = PTR_ERR(cookie));
@@ -809,6 +715,8 @@ llu_file_read(struct inode *inode, const struct iovec *iovec,
         ldlm_policy_data_t policy;
         struct llu_sysio_callback_args *lsca;
         struct llu_sysio_cookie *cookie;
+        int astflag = (lli->lli_open_flags & O_NONBLOCK) ?
+                       LDLM_FL_BLOCK_NOWAIT : 0;
         __u64 kms;
         int iovidx;
 
@@ -831,7 +739,8 @@ llu_file_read(struct inode *inode, const struct iovec *iovec,
                 policy.l_extent.start = pos;
                 policy.l_extent.end = pos + count - 1;
 
-                err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
+                err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy,
+                                      &lockh, astflag);
                 if (err != ELDLM_OK)
                         GOTO(err_out, err = -ENOLCK);