#include "llite_lib.h"
+static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
+{
+ struct llu_inode_info *lli = llu_i2info(inode);
+ struct lov_stripe_md *lsm = lli->lli_smd;
+ struct obd_export *exp = llu_i2obdexp(inode);
+ struct {
+ char name[16];
+ struct ldlm_lock *lock;
+ struct lov_stripe_md *lsm;
+ } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };
+ __u32 stripe, vallen = sizeof(stripe);
+ int rc;
+ ENTRY;
+
+ if (lsm->lsm_stripe_count == 1)
+ RETURN(0);
+
+ /* get our offset in the lov */
+ rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);
+ if (rc != 0) {
+ CERROR("obd_get_info: rc = %d\n", rc);
+ LBUG();
+ }
+ LASSERT(stripe < lsm->lsm_stripe_count);
+ RETURN(stripe);
+}
+
static int llu_extent_lock_callback(struct ldlm_lock *lock,
struct ldlm_lock_desc *new, void *data,
int flag)
case LDLM_CB_CANCELING: {
struct inode *inode = llu_inode_from_lock(lock);
struct llu_inode_info *lli;
+ struct lov_stripe_md *lsm;
+ __u32 stripe;
+ __u64 kms;
if (!inode)
RETURN(0);
lli= llu_i2info(inode);
- if (!lli) {
- I_RELE(inode);
- RETURN(0);
- }
- if (!lli->lli_smd) {
- I_RELE(inode);
- RETURN(0);
- }
-
-/*
- ll_pgcache_remove_extent(inode, lli->lli_smd, lock);
- iput(inode);
-*/
+ if (!lli)
+ goto iput;
+ if (!lli->lli_smd)
+ goto iput;
+ lsm = lli->lli_smd;
+
+ stripe = llu_lock_to_stripe_offset(inode, lock);
+ kms = ldlm_extent_shift_kms(lock,
+ lsm->lsm_oinfo[stripe].loi_kms);
+ if (lsm->lsm_oinfo[stripe].loi_kms != kms)
+ LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+ lsm->lsm_oinfo[stripe].loi_kms, kms);
+ lsm->lsm_oinfo[stripe].loi_kms = kms;
+iput:
I_RELE(inode);
break;
}
RETURN(0);
}
+static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
+{
+ struct ptlrpc_request *req = reqp;
+ struct inode *inode = llu_inode_from_lock(lock);
+ struct obd_export *exp;
+ struct llu_inode_info *lli;
+ struct ost_lvb *lvb;
+ struct {
+ int stripe_number;
+ __u64 size;
+ struct lov_stripe_md *lsm;
+ } data;
+ __u32 vallen = sizeof(data);
+ int rc, size = sizeof(*lvb);
+ ENTRY;
+
+ if (inode == NULL)
+ RETURN(0);
+ lli = llu_i2info(inode);
+ if (lli == NULL)
+ goto iput;
+ if (lli->lli_smd == NULL)
+ goto iput;
+ exp = llu_i2obdexp(inode);
+
+ /* First, find out which stripe index this lock corresponds to. */
+ if (lli->lli_smd->lsm_stripe_count > 1)
+ data.stripe_number = llu_lock_to_stripe_offset(inode, lock);
+ else
+ data.stripe_number = 0;
+
+ data.size = lli->lli_st_size;
+ data.lsm = lli->lli_smd;
+
+ rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe",
+ &vallen, &data);
+ if (rc != 0) {
+ CERROR("obd_get_info: rc = %d\n", rc);
+ LBUG();
+ }
+
+ LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
+ lli->lli_st_size, data.stripe_number, data.size);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc) {
+ CERROR("lustre_pack_reply: %d\n", rc);
+ goto iput;
+ }
+
+ lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
+ lvb->lvb_size = data.size;
+ ptlrpc_reply(req);
+
+ iput:
+ I_RELE(inode);
+ RETURN(0);
+}
+
+__u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
+__u64 lov_merge_mtime(struct lov_stripe_md *lsm, __u64 current_time);
+
+/* NB: lov_merge_size will prefer locally cached writes if they extend the
+ * file (because it prefers KMS over RSS when larger) */
+int llu_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
+{
+ struct llu_inode_info *lli = llu_i2info(inode);
+ struct llu_sb_info *sbi = llu_i2sbi(inode);
+ ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
+ struct lustre_handle lockh;
+ int rc, flags = LDLM_FL_HAS_INTENT;
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE, "Glimpsing inode %lu\n", lli->lli_st_ino);
+
+ rc = obd_enqueue(sbi->ll_osc_exp, lli->lli_smd, LDLM_EXTENT, &policy,
+ LCK_PR, &flags, llu_extent_lock_callback,
+ ldlm_completion_ast, llu_glimpse_callback, inode,
+ sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
+ if (rc > 0)
+ RETURN(-EIO);
+
+ lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
+ //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
+
+ CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64"\n", lvb->lvb_size);
+
+ obd_cancel(sbi->ll_osc_exp, lli->lli_smd, LCK_PR, &lockh);
+
+ RETURN(rc);
+}
+
+int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
+ struct lov_stripe_md *lsm, int mode,
+ ldlm_policy_data_t *policy, struct lustre_handle *lockh,
+ int ast_flags)
+{
+ struct llu_sb_info *sbi = llu_i2sbi(inode);
+ struct llu_inode_info *lli = llu_i2info(inode);
+ int rc;
+ ENTRY;
+
+ LASSERT(lockh->cookie == 0);
+
+ /* XXX phil: can we do this? won't it screw the file size up? */
+ if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
+ (sbi->ll_flags & LL_SBI_NOLCK))
+ RETURN(0);
+
+ CDEBUG(D_DLMTRACE, "Locking inode %lu, start "LPU64" end "LPU64"\n",
+ lli->lli_st_ino, policy->l_extent.start, policy->l_extent.end);
+
+ rc = obd_enqueue(sbi->ll_osc_exp, lsm, LDLM_EXTENT, policy, mode,
+ &ast_flags, llu_extent_lock_callback,
+ ldlm_completion_ast, llu_glimpse_callback, inode,
+ sizeof(struct ost_lvb), lustre_swab_ost_lvb, lockh);
+ if (rc > 0)
+ rc = -EIO;
+
+ if (policy->l_extent.start == 0 &&
+ policy->l_extent.end == OBD_OBJECT_EOF)
+ lli->lli_st_size = lov_merge_size(lsm, 1);
+
+ //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
+
+ RETURN(rc);
+}
+
+#if 0
int llu_extent_lock_no_validate(struct ll_file_data *fd,
struct inode *inode,
struct lov_stripe_md *lsm,
RETURN(0);
}
+#endif
int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,
struct lov_stripe_md *lsm, int mode,
struct llu_sb_info *sbi = llu_i2sbi(inode);
int rc;
ENTRY;
-#if 0
+
/* XXX phil: can we do this? won't it screw the file size up? */
if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||
(sbi->ll_flags & LL_SBI_NOLCK))
RETURN(0);
-#endif
+
rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);
RETURN(rc);
struct ll_file_data *fd = lli->lli_file_data;
struct lustre_handle lockh = {0};
struct lov_stripe_md *lsm = lli->lli_smd;
+ ldlm_policy_data_t policy;
struct llu_sysio_callback_args *lsca;
struct llu_sysio_cookie *cookie;
- struct ldlm_extent extent;
ldlm_error_t err;
int iovidx;
ENTRY;
if (count == 0)
continue;
- /* FIXME libsysio haven't handle O_APPEND */
- extent.start = pos;
- extent.end = pos + count - 1;
+ if (pos + count > lli->lli_maxbytes)
+ GOTO(err_out, err = -ERANGE);
-#ifdef LIBLUSTRE_HANDLE_UNALIGNED_PAGE
- if ((pos & ~PAGE_CACHE_MASK) == 0 &&
- (count & ~PAGE_CACHE_MASK) == 0)
- err = llu_extent_lock_no_validate(fd, inode, lsm,
- LCK_PW, &extent, &lockh, 0);
- else
- err = llu_extent_lock(fd, inode, lsm, LCK_PW,
- &extent, &lockh);
-#else
- /* server will handle partial write, so we don't
- * care for file size here */
- err = llu_extent_lock_no_validate(fd, inode, lsm, LCK_PW,
- &extent, &lockh, 0);
-#endif
+ /* FIXME libsysio haven't handle O_APPEND?? */
+ policy.l_extent.start = pos;
+ policy.l_extent.end = pos + count - 1;
+
+ err = llu_extent_lock(fd, inode, lsm, LCK_PW, &policy,
+ &lockh, 0);
if (err != ELDLM_OK)
GOTO(err_out, err = -ENOLCK);
struct llu_sysio_callback_args*
llu_file_read(struct inode *inode, const struct iovec *iovec,
- size_t iovlen, loff_t pos)
+ size_t iovlen, loff_t pos)
{
struct llu_inode_info *lli = llu_i2info(inode);
struct ll_file_data *fd = lli->lli_file_data;
struct lov_stripe_md *lsm = lli->lli_smd;
struct lustre_handle lockh = { 0 };
- struct ldlm_extent extent;
+ ldlm_policy_data_t policy;
struct llu_sysio_callback_args *lsca;
struct llu_sysio_cookie *cookie;
+ __u64 kms;
int iovidx;
ldlm_error_t err;
if (count == 0)
continue;
- extent.start = pos;
- extent.end = pos + count - 1;
+ policy.l_extent.start = pos;
+ policy.l_extent.end = pos + count - 1;
- err = llu_extent_lock(fd, inode, lsm, LCK_PR, &extent, &lockh);
+ err = llu_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, 0);
if (err != ELDLM_OK)
GOTO(err_out, err = -ENOLCK);
- CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
- lli->lli_st_ino, count, pos);
+ kms = lov_merge_size(lsm, 1);
+ if (policy.l_extent.end > kms) {
+ /* A glimpse is necessary to determine whether we
+ * return a short read or some zeroes at the end of
+ * the buffer */
+ struct ost_lvb lvb;
+ if (llu_glimpse_size(inode, &lvb)) {
+ llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
+ GOTO(err_out, err = -ENOLCK);
+ }
+ lli->lli_st_size = lvb.lvb_size;
+ } else {
+ lli->lli_st_size = kms;
+ }
+
+ CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, "
+ "i_size "LPU64"\n", lli->lli_st_ino, count, pos,
+ lli->lli_st_size);
if (pos >= lli->lli_st_size) {
llu_extent_unlock(fd, inode, lsm, LCK_PR, &lockh);
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
- NULL, 0, LCK_PR, &lockh)) {
+ NULL, LCK_PR, &lockh)) {
ldlm_lock_decref(&lockh, LCK_PR);
RETURN(1);
}
if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
- NULL, 0, LCK_PW, &lockh)) {
+ NULL, LCK_PW, &lockh)) {
ldlm_lock_decref(&lockh, LCK_PW);
RETURN(1);
}
if (!lsm) /* object not yet allocated, don't validate size */
RETURN(0);
- /*
- * unfortunately stat comes in through revalidate and we don't
- * differentiate this use from initial instantiation. we're
- * also being wildly conservative and flushing write caches
- * so that stat really returns the proper size.
- */
+ /* ll_glimpse_size will prefer locally cached writes if they extend
+ * the file */
{
- struct ldlm_extent extent = {0, OBD_OBJECT_EOF};
- struct lustre_handle lockh = {0};
+ struct ost_lvb lvb;
ldlm_error_t err;
- err = llu_extent_lock(NULL, inode, lsm, LCK_PR, &extent, &lockh);
- if (err != ELDLM_OK)
- RETURN(err);
-
- llu_extent_unlock(NULL, inode, lsm, LCK_PR, &lockh);
+ err = llu_glimpse_size(inode, &lvb);
+ lli->lli_st_size = lvb.lvb_size;
}
RETURN(0);
}
}
if (ia_valid & ATTR_SIZE) {
- struct ldlm_extent extent = { .start = attr->ia_size,
- .end = OBD_OBJECT_EOF };
+ ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
+ OBD_OBJECT_EOF} };
struct lustre_handle lockh = { 0 };
int err, ast_flags = 0;
/* XXX when we fix the AST intents to pass the discard-range
* XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
* XXX here. */
-
- /* Writeback uses inode->i_size to determine how far out
- * its cached pages go. ll_truncate gets a PW lock, canceling
- * our lock, _after_ it has updated i_size. this can confuse
- *
- * We really need to get our PW lock before we change
- * inode->i_size. If we don't we can race with other
- * i_size updaters on our node, like ll_file_read. We
- * can also race with i_size propogation to other
- * nodes through dirtying and writeback of final cached
- * pages. This last one is especially bad for racing
- * o_append users on other nodes. */
- if (extent.start == 0)
+ if (attr->ia_size == 0)
ast_flags = LDLM_AST_DISCARD_DATA;
- rc = llu_extent_lock_no_validate(NULL, inode, lsm, LCK_PW,
- &extent, &lockh, ast_flags);
+
+ rc = llu_extent_lock(NULL, inode, lsm, LCK_PW, &policy,
+ &lockh, ast_flags);
if (rc != ELDLM_OK) {
if (rc > 0)
RETURN(-ENOLCK);