X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fliblustre%2Frw.c;h=7a2ac2923700d7a4ee1e41c528224713bb37ea45;hp=307dd467259112d0ea27e351cf19a05ee859b3a0;hb=a1d11a561dea2ae38275eedc64d09e1fe1730d6b;hpb=1b0e1ce43a3c18459eaccbcd8f4b2c84f36d061e diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index 307dd46..7a2ac29 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -40,6 +40,33 @@ #include "llite_lib.h" +static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct obd_export *exp = llu_i2obdexp(inode); + struct { + char name[16]; + struct ldlm_lock *lock; + struct lov_stripe_md *lsm; + } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm }; + __u32 stripe, vallen = sizeof(stripe); + int rc; + ENTRY; + + if (lsm->lsm_stripe_count == 1) + RETURN(0); + + /* get our offset in the lov */ + rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe); + if (rc != 0) { + CERROR("obd_get_info: rc = %d\n", rc); + LBUG(); + } + LASSERT(stripe < lsm->lsm_stripe_count); + RETURN(stripe); +} + static int llu_extent_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new, void *data, int flag) @@ -64,23 +91,27 @@ static int llu_extent_lock_callback(struct ldlm_lock *lock, case LDLM_CB_CANCELING: { struct inode *inode = llu_inode_from_lock(lock); struct llu_inode_info *lli; + struct lov_stripe_md *lsm; + __u32 stripe; + __u64 kms; if (!inode) RETURN(0); lli= llu_i2info(inode); - if (!lli) { - I_RELE(inode); - RETURN(0); - } - if (!lli->lli_smd) { - I_RELE(inode); - RETURN(0); - } - -/* - ll_pgcache_remove_extent(inode, lli->lli_smd, lock); - iput(inode); -*/ + if (!lli) + goto iput; + if (!lli->lli_smd) + goto iput; + lsm = lli->lli_smd; + + stripe = llu_lock_to_stripe_offset(inode, lock); + kms = ldlm_extent_shift_kms(lock, + lsm->lsm_oinfo[stripe].loi_kms); + if (lsm->lsm_oinfo[stripe].loi_kms != kms) + LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, + lsm->lsm_oinfo[stripe].loi_kms, kms); + lsm->lsm_oinfo[stripe].loi_kms = kms; +iput: I_RELE(inode); break; } @@ -91,6 +122,135 @@ static int llu_extent_lock_callback(struct ldlm_lock *lock, RETURN(0); } +static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp) +{ + struct ptlrpc_request *req = reqp; + struct inode *inode = llu_inode_from_lock(lock); + struct obd_export *exp; + struct llu_inode_info *lli; + struct ost_lvb *lvb; + struct { + int stripe_number; + __u64 size; + struct lov_stripe_md *lsm; + } data; + __u32 vallen = sizeof(data); + int rc, size = sizeof(*lvb); + ENTRY; + + if (inode == NULL) + RETURN(0); + lli = llu_i2info(inode); + if (lli == NULL) + goto iput; + if (lli->lli_smd == NULL) + goto iput; + exp = llu_i2obdexp(inode); + + /* First, find out which stripe index this lock corresponds to. */ + if (lli->lli_smd->lsm_stripe_count > 1) + data.stripe_number = llu_lock_to_stripe_offset(inode, lock); + else + data.stripe_number = 0; + + data.size = lli->lli_st_size; + data.lsm = lli->lli_smd; + + rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe", + &vallen, &data); + if (rc != 0) { + CERROR("obd_get_info: rc = %d\n", rc); + LBUG(); + } + + LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu", + lli->lli_st_size, data.stripe_number, data.size); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) { + CERROR("lustre_pack_reply: %d\n", rc); + goto iput; + } + + lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb)); + lvb->lvb_size = data.size; + ptlrpc_reply(req); + + iput: + I_RELE(inode); + RETURN(0); +} + +__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms); +__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time); + +/* NB: lov_merge_size will prefer locally cached writes if they extend the + * file (because it prefers KMS over RSS when larger) */ +int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb) +{ + struct llu_inode_info *lli = llu_i2info(inode); + struct llu_sb_info *sbi = llu_i2sbi(inode); + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + struct lustre_handle lockh; + int rc, flags = LDLM_FL_HAS_INTENT; + ENTRY; + + CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino); + + rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy, + LCK_PR, &flags, llu_extent_lock_callback, + ldlm_completion_ast, llu_glimpse_callback, inode, + sizeof(*lvb), lustre_swab_ost_lvb, &lockh); + if (rc > 0) + RETURN(-EIO); + + lvb->lvb_size = lov_merge_size(lli->lli_smd, 0); + //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime); + + CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size); + + obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh); + + RETURN(rc); +} + +int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, + struct lov_stripe_md *lsm, int mode, + ldlm_policy_data_t *policy, struct lustre_handle *lockh, + int ast_flags) +{ + struct llu_sb_info *sbi = llu_i2sbi(inode); + struct llu_inode_info *lli = llu_i2info(inode); + int rc; + ENTRY; + + LASSERT(lockh->cookie == 0); + + /* XXX phil: can we do this? won't it screw the file size up? */ + if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || + (sbi->ll_flags & LL_SBI_NOLCK)) + RETURN(0); + + CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n", + lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end); + + rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode, + &ast_flags, llu_extent_lock_callback, + ldlm_completion_ast, llu_glimpse_callback, inode, + sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh); + if (rc > 0) + rc = -EIO; + + if (policy->l_extent.start == 0 && + policy->l_extent.end == OBD_OBJECT_EOF) + lli->lli_st_size = lov_merge_size(lsm, 1); + + //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime); + + RETURN(rc); +} + +#if 0 int llu_extent_lock_no_validate(struct ll_file_data *fd, struct inode *inode, struct lov_stripe_md *lsm, @@ -167,6 +327,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, RETURN(0); } +#endif int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode, struct lov_stripe_md *lsm, int mode, @@ -175,12 +336,12 @@ int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode, struct llu_sb_info *sbi = llu_i2sbi(inode); int rc; ENTRY; -#if 0 + /* XXX phil: can we do this? won't it screw the file size up? */ if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) || (sbi->ll_flags & LL_SBI_NOLCK)) RETURN(0); -#endif + rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh); RETURN(rc); @@ -539,9 +700,9 @@ llu_file_write(struct inode *inode, const struct iovec *iovec, struct ll_file_data *fd = lli->lli_file_data; struct lustre_handle lockh = {0}; struct lov_stripe_md *lsm = lli->lli_smd; + ldlm_policy_data_t policy; struct llu_sysio_callback_args *lsca; struct llu_sysio_cookie *cookie; - struct ldlm_extent extent; ldlm_error_t err; int iovidx; ENTRY; @@ -564,24 +725,15 @@ llu_file_write(struct inode *inode, const struct iovec *iovec, if (count == 0) continue; - /* FIXME libsysio haven't handle O_APPEND */ - extent.start = pos; - extent.end = pos + count - 1; + if (pos + count > lli->lli_maxbytes) + GOTO(err_out, err = -ERANGE); -#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE - if ((pos & ~PAGE_CACHE_MASK) == 0 && - (count & ~PAGE_CACHE_MASK) == 0) - err = llu_extent_lock_no_validate(fd, inode, lsm, - LCK_PW, &extent, &lockh, 0); - else - err = llu_extent_lock(fd, inode, lsm, LCK_PW, - &extent, &lockh); -#else - /* server will handle partial write, so we don't - * care for file size here */ - err = llu_extent_lock_no_validate(fd, inode, lsm, LCK_PW, - &extent, &lockh, 0); -#endif + /* FIXME libsysio haven't handle O_APPEND?? */ + policy.l_extent.start = pos; + policy.l_extent.end = pos + count - 1; + + err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy, + &lockh, 0); if (err != ELDLM_OK) GOTO(err_out, err = -ENOLCK); @@ -648,15 +800,16 @@ static void llu_update_atime(struct inode *inode) struct llu_sysio_callback_args* llu_file_read(struct inode *inode, const struct iovec *iovec, - size_t iovlen, loff_t pos) + size_t iovlen, loff_t pos) { struct llu_inode_info *lli = llu_i2info(inode); struct ll_file_data *fd = lli->lli_file_data; struct lov_stripe_md *lsm = lli->lli_smd; struct lustre_handle lockh = { 0 }; - struct ldlm_extent extent; + ldlm_policy_data_t policy; struct llu_sysio_callback_args *lsca; struct llu_sysio_cookie *cookie; + __u64 kms; int iovidx; ldlm_error_t err; @@ -675,15 +828,31 @@ llu_file_read(struct inode *inode, const struct iovec *iovec, if (count == 0) continue; - extent.start = pos; - extent.end = pos + count - 1; + policy.l_extent.start = pos; + policy.l_extent.end = pos + count - 1; - err = llu_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh); + err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0); if (err != ELDLM_OK) GOTO(err_out, err = -ENOLCK); - CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n", - lli->lli_st_ino, count, pos); + kms = lov_merge_size(lsm, 1); + if (policy.l_extent.end > kms) { + /* A glimpse is necessary to determine whether we + * return a short read or some zeroes at the end of + * the buffer */ + struct ost_lvb lvb; + if (llu_glimpse_size(inode, &lvb)) { + llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh); + GOTO(err_out, err = -ENOLCK); + } + lli->lli_st_size = lvb.lvb_size; + } else { + lli->lli_st_size = kms; + } + + CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, " + "i_size "LPU64"\n", lli->lli_st_ino, count, pos, + lli->lli_st_size); if (pos >= lli->lli_st_size) { llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);