#include "llite_internal.h"
#include <linux/obd_lov.h>
-#define XATTR_NAME_MAX 255
-int ll_md_close(struct obd_export *md_exp, struct inode *inode,
- struct file *file)
+int ll_md_och_close(struct obd_export *md_exp, struct inode *inode,
+ struct obd_client_handle *och)
{
- struct ll_file_data *fd = file->private_data;
struct ptlrpc_request *req = NULL;
- struct obd_client_handle *och = &fd->fd_mds_och;
struct obdo *obdo = NULL;
+ struct obd_device *obd;
int rc;
ENTRY;
- /* clear group lock, if present */
- if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
- rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
- &fd->fd_cwlockh);
+ obd = class_exp2obd(md_exp);
+ if (obd == NULL) {
+ CERROR("Invalid MDC connection handle "LPX64"\n",
+ md_exp->exp_handle.h_cookie);
+ EXIT;
+ return 0;
}
+ /*
+ * here we check if this is forced umount. If so this is called on
+ * canceling "open lock" and we do not call md_close() in this case , as
+ * it will not successful, as import is already deactivated.
+ */
+ if (obd->obd_no_recov)
+ GOTO(out, rc = 0);
+
+ /* closing opened file */
obdo = obdo_alloc();
if (obdo == NULL)
RETURN(-ENOMEM);
obdo->o_flags = MDS_BFLAG_UNCOMMITTED_WRITES;
obdo->o_valid |= OBD_MD_FLFLAGS;
}
+ obdo->o_fid = id_fid(&ll_i2info(inode)->lli_id);
obdo->o_mds = id_group(&ll_i2info(inode)->lli_id);
rc = md_close(md_exp, obdo, och, &req);
obdo_free(obdo);
if (rc == EAGAIN) {
- /* We are the last writer, so the MDS has instructed us to get
- * the file size and any write cookies, then close again. */
+ /*
+ * we are the last writer, so the MDS has instructed us to get
+ * the file size and any write cookies, then close again.
+ */
+
//ll_queue_done_writing(inode);
rc = 0;
} else if (rc) {
CERROR("inode %lu mdc close failed: rc = %d\n",
- inode->i_ino, rc);
+ (unsigned long)inode->i_ino, rc);
}
+
+ /* objects are destroed on OST only if metadata close was
+ * successful.*/
if (rc == 0) {
- rc = ll_objects_destroy(req, file->f_dentry->d_inode, 1);
+ rc = ll_objects_destroy(req, inode, 1);
if (rc)
CERROR("inode %lu ll_objects destroy: rc = %d\n",
inode->i_ino, rc);
}
- mdc_clear_open_replay_data(md_exp, och);
ptlrpc_req_finished(req);
+ EXIT;
+out:
+ mdc_clear_open_replay_data(md_exp, och);
och->och_fh.cookie = DEAD_HANDLE_MAGIC;
+ OBD_FREE(och, sizeof *och);
+ return rc;
+}
+
+int ll_md_real_close(struct obd_export *md_exp,
+ struct inode *inode, int flags)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_client_handle **och_p;
+ struct obd_client_handle *och;
+ __u64 *och_usecount;
+ int rc = 0;
+ ENTRY;
+
+ if (flags & FMODE_WRITE) {
+ och_p = &lli->lli_mds_write_och;
+ och_usecount = &lli->lli_open_fd_write_count;
+ } else if (flags & FMODE_EXEC) {
+ och_p = &lli->lli_mds_exec_och;
+ och_usecount = &lli->lli_open_fd_exec_count;
+ } else {
+ och_p = &lli->lli_mds_read_och;
+ och_usecount = &lli->lli_open_fd_read_count;
+ }
+
+ down(&lli->lli_och_sem);
+ if (*och_usecount) { /* There are still users of this handle, so
+ skip freeing it. */
+ up(&lli->lli_och_sem);
+ RETURN(0);
+ }
+ och = *och_p;
+
+ *och_p = NULL;
+ up(&lli->lli_och_sem);
+
+ /*
+ * there might be a race and somebody have freed this och
+ * already. Another way to have this twice called is if file closing
+ * will fail due to netwok problems and on umount lock will be canceled
+ * and this will be called from block_ast callack.
+ */
+ if (och && och->och_fh.cookie != DEAD_HANDLE_MAGIC)
+ rc = ll_md_och_close(md_exp, inode, och);
+
+ RETURN(rc);
+}
+
+int ll_md_close(struct obd_export *md_exp, struct inode *inode,
+ struct file *file)
+{
+ struct ll_file_data *fd = file->private_data;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ int rc = 0;
+ ENTRY;
+
+ /* clear group lock, if present */
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+ struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+ fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
+ rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
+ &fd->fd_cwlockh);
+ }
+
+ /* Let's see if we have good enough OPEN lock on the file and if
+ we can skip talking to MDS */
+ if (file->f_dentry->d_inode) {
+ int lockmode;
+ struct obd_device *obddev;
+ struct lustre_handle lockh;
+ int flags = LDLM_FL_BLOCK_GRANTED;
+ struct ldlm_res_id file_res_id = {.name = {id_fid(&lli->lli_id),
+ id_group(&lli->lli_id)}};
+ ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
+
+ down(&lli->lli_och_sem);
+ if (fd->fd_omode & FMODE_WRITE) {
+ lockmode = LCK_CW;
+ LASSERT(lli->lli_open_fd_write_count);
+ lli->lli_open_fd_write_count--;
+ } else if (fd->fd_omode & FMODE_EXEC) {
+ lockmode = LCK_PR;
+ LASSERT(lli->lli_open_fd_exec_count);
+ lli->lli_open_fd_exec_count--;
+ } else {
+ lockmode = LCK_CR;
+ LASSERT(lli->lli_open_fd_read_count);
+ lli->lli_open_fd_read_count--;
+ }
+ up(&lli->lli_och_sem);
+
+ obddev = md_get_real_obd(md_exp, &lli->lli_id);
+ if (!ldlm_lock_match(obddev->obd_namespace, flags, &file_res_id,
+ LDLM_IBITS, &policy, lockmode, &lockh))
+ {
+ rc = ll_md_real_close(md_exp, file->f_dentry->d_inode,
+ fd->fd_omode);
+ } else {
+ ldlm_lock_decref(&lockh, lockmode);
+ }
+ }
+
file->private_data = NULL;
OBD_SLAB_FREE(fd, ll_file_data_slab, sizeof(*fd));
RETURN(rc);
OBD_ALLOC(op_data, sizeof(*op_data));
if (op_data == NULL)
RETURN(-ENOMEM);
- ll_prepare_mdc_data(op_data, parent->d_inode, NULL, name, len, O_RDWR);
+
+ ll_prepare_mdc_data(op_data, parent->d_inode, NULL,
+ name, len, O_RDWR);
rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, itp, LCK_PR, op_data,
&lockh, lmm, lmmsize, ldlm_completion_ast,
} else if (rc < 0) {
CERROR("lock enqueue: err: %d\n", rc);
}
-
RETURN(rc);
}
-int ll_local_open(struct file *file, struct lookup_intent *it)
+void ll_och_fill(struct inode *inode, struct lookup_intent *it,
+ struct obd_client_handle *och)
{
struct ptlrpc_request *req = LUSTRE_IT(it)->it_data;
- struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
- struct obd_export *md_exp = ll_i2mdexp(file->f_dentry->d_inode);
- struct ll_file_data *fd;
+ struct ll_inode_info *lli = ll_i2info(inode);
struct mds_body *body;
- ENTRY;
+ LASSERT(och);
body = lustre_msg_buf (req->rq_repmsg, 1, sizeof (*body));
- LASSERT (body != NULL); /* reply already checked out */
- LASSERT_REPSWABBED (req, 1); /* and swabbed down */
+ LASSERT (body != NULL); /* reply already checked out */
+ LASSERT_REPSWABBED (req, 1); /* and swabbed down */
+
+ memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
+ och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
+ lli->lli_io_epoch = body->io_epoch;
+ mdc_set_open_replay_data(ll_i2mdexp(inode), och,
+ LUSTRE_IT(it)->it_data);
+}
+
+int ll_local_open(struct file *file, struct lookup_intent *it,
+ struct obd_client_handle *och)
+{
+ struct ll_file_data *fd;
+ ENTRY;
- LASSERTF(file->private_data == NULL, "file %*s/%*s ino %lu/%u (%o)\n",
+ if (och)
+ ll_och_fill(file->f_dentry->d_inode, it, och);
+
+ LASSERTF(file->private_data == NULL, "file %.*s/%.*s ino %lu/%u (%o)\n",
file->f_dentry->d_name.len, file->f_dentry->d_name.name,
file->f_dentry->d_parent->d_name.len,
file->f_dentry->d_parent->d_name.name,
file->f_dentry->d_inode->i_generation,
file->f_dentry->d_inode->i_mode);
-
OBD_SLAB_ALLOC(fd, ll_file_data_slab, SLAB_KERNEL, sizeof *fd);
/* We can't handle this well without reorganizing ll_file_open and
* ll_md_close(), so don't even try right now. */
LASSERT(fd != NULL);
- memcpy(&fd->fd_mds_och.och_fh, &body->handle, sizeof(body->handle));
- fd->fd_mds_och.och_magic = OBD_CLIENT_HANDLE_MAGIC;
file->private_data = fd;
ll_readahead_init(file->f_dentry->d_inode, &fd->fd_ras);
-
- lli->lli_io_epoch = body->io_epoch;
-
- mdc_set_open_replay_data(md_exp, &fd->fd_mds_och, LUSTRE_IT(it)->it_data);
-
+ fd->fd_omode = it->it_flags;
RETURN(0);
}
struct lov_stripe_md *lsm;
struct ptlrpc_request *req;
int rc = 0;
+ struct obd_client_handle **och_p;
+ __u64 *och_usecount;
ENTRY;
- CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
- inode->i_generation, inode, file->f_flags);
+ CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n",
+ inode->i_ino, inode->i_generation, inode, file->f_flags);
/* don't do anything for / */
if (inode->i_sb->s_root == file->f_dentry)
RETURN(0);
+ if ((file->f_flags+1) & O_ACCMODE)
+ oit.it_flags++;
+ if (file->f_flags & O_TRUNC)
+ oit.it_flags |= 2;
+
it = file->f_it;
- if (!it || !LUSTRE_IT(it) || !LUSTRE_IT(it)->it_disposition) {
+ /*
+ * sometimes LUSTRE_IT(it) may not be allocated like opening file by
+ * dentry_open() from GNS stuff.
+ */
+ if (!it || !LUSTRE_IT(it)) {
it = &oit;
rc = ll_intent_alloc(it);
if (rc)
GOTO(out, rc);
- rc = ll_intent_file_open(file, NULL, 0, it);
- if (rc)
- GOTO(out, rc);
}
-
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
- /* mdc_intent_lock() didn't get a request ref if there was an open
- * error, so don't do cleanup on the request here (bug 3430) */
- rc = it_open_error(DISP_OPEN_OPEN, it);
- if (rc)
- RETURN(rc);
+
+ /*
+ * mdc_intent_lock() didn't get a request ref if there was an open
+ * error, so don't do cleanup on the * request here (bug 3430)
+ */
+ if (LUSTRE_IT(it)->it_disposition) {
+ rc = it_open_error(DISP_OPEN_OPEN, it);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /* Let's see if we have file open on MDS already. */
+ if (it->it_flags & FMODE_WRITE) {
+ och_p = &lli->lli_mds_write_och;
+ och_usecount = &lli->lli_open_fd_write_count;
+ } else if (it->it_flags & FMODE_EXEC) {
+ och_p = &lli->lli_mds_exec_och;
+ och_usecount = &lli->lli_open_fd_exec_count;
+ } else {
+ och_p = &lli->lli_mds_read_och;
+ och_usecount = &lli->lli_open_fd_read_count;
+ }
+
+ down(&lli->lli_och_sem);
+ if (*och_p) { /* Open handle is present */
+ if (LUSTRE_IT(it)->it_disposition) {
+ struct obd_client_handle *och;
+ /* Well, there's extra open request that we do not need,
+ let's close it somehow*/
+ OBD_ALLOC(och, sizeof (struct obd_client_handle));
+ if (!och) {
+ up(&lli->lli_och_sem);
+ RETURN(-ENOMEM);
+ }
- rc = ll_local_open(file, it);
+ ll_och_fill(inode, it, och);
+ /* ll_md_och_close() will free och */
+ ll_md_och_close(ll_i2mdexp(inode), inode, och);
+ }
+ (*och_usecount)++;
+
+ rc = ll_local_open(file, it, NULL);
+ if (rc)
+ LBUG();
+ } else {
+ LASSERT(*och_usecount == 0);
+ OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
+ if (!*och_p)
+ GOTO(out, rc = -ENOMEM);
+ (*och_usecount)++;
+
+ if (!it || !LUSTRE_IT(it) || !LUSTRE_IT(it)->it_disposition) {
+ /* We are going to replace intent here, and that may
+ possibly change access mode (FMODE_EXEC can only be
+ set in intent), but I hope it never happens (I was
+ not able to trigger it yet at least) -- green */
+ /* FIXME: FMODE_EXEC is not covered by O_ACCMODE! */
+ LASSERT(!(it->it_flags & FMODE_EXEC));
+ LASSERTF((it->it_flags & O_ACCMODE) ==
+ (oit.it_flags & O_ACCMODE), "Changing intent "
+ "flags %x to incompatible %x\n",it->it_flags,
+ oit.it_flags);
+ it = &oit;
+ rc = ll_intent_file_open(file, NULL, 0, it);
+ if (rc)
+ GOTO(out, rc);
+ rc = it_open_error(DISP_OPEN_OPEN, it);
+ if (rc)
+ GOTO(out_och_free, rc);
- LASSERTF(rc == 0, "rc = %d\n", rc);
+ mdc_set_lock_data(NULL, &LUSTRE_IT(it)->it_lock_handle,
+ file->f_dentry->d_inode);
+ }
+ lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_OPEN);
+ rc = ll_local_open(file, it, *och_p);
+ LASSERTF(rc == 0, "rc = %d\n", rc);
+ }
+ up(&lli->lli_och_sem);
+ /* Must do this outside lli_och_sem lock to prevent deadlock where
+ different kind of OPEN lock for this same inode gets cancelled
+ by ldlm_cancel_lru */
if (!S_ISREG(inode->i_mode))
GOTO(out, rc);
GOTO(out, rc);
out:
req = LUSTRE_IT(it)->it_data;
+ ll_intent_drop_lock(it);
ll_intent_release(it);
-
ptlrpc_req_finished(req);
- if (rc == 0)
+ if (rc == 0) {
ll_open_complete(inode);
+ } else {
+out_och_free:
+ if (*och_p) {
+ OBD_FREE(*och_p, sizeof (struct obd_client_handle));
+ *och_p = NULL; /* OBD_FREE writes some magic there */
+ (*och_usecount)--;
+ }
+ up(&lli->lli_och_sem);
+ }
+
return rc;
}
for (i = start; i <= end; i += (j + skip)) {
j = min(count - (i % count), end - i + 1);
+ LASSERT(j > 0);
LASSERT(inode->i_mapping);
if (ll_teardown_mmaps(inode->i_mapping, i << PAGE_CACHE_SHIFT,
((i+j) << PAGE_CACHE_SHIFT) - 1) )
goto iput;
ll_pgcache_remove_extent(inode, lsm, lock, stripe);
- /* grabbing the i_sem will wait for write() to complete. ns
- * lock hold times should be very short as ast processing
- * requires them and has a short timeout. so, i_sem before ns
- * lock.*/
-
- down(&inode->i_sem);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ down(&lli->lli_size_sem);
kms = ldlm_extent_shift_kms(lock,
lsm->lsm_oinfo[stripe].loi_kms);
LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
lsm->lsm_oinfo[stripe].loi_kms, kms);
lsm->lsm_oinfo[stripe].loi_kms = kms;
+ up(&lli->lli_size_sem);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- up(&inode->i_sem);
//ll_try_done_writing(inode);
iput:
iput(inode);
lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
+ lvb->lvb_mtime = LTIME_S(inode->i_mtime);
+ lvb->lvb_atime = LTIME_S(inode->i_atime);
+ lvb->lvb_ctime = LTIME_S(inode->i_ctime);
LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
inode->i_size, stripe, lvb->lvb_size);
RETURN(rc > 0 ? -EIO : rc);
}
+ down(&lli->lli_size_sem);
inode->i_size = lov_merge_size(lli->lli_smd, 0);
inode->i_blocks = lov_merge_blocks(lli->lli_smd);
- //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
+ up(&lli->lli_size_sem);
+
+ LTIME_S(inode->i_mtime) = lov_merge_mtime(lli->lli_smd,
+ LTIME_S(inode->i_mtime));
CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",
(__u64)inode->i_size, (__u64)inode->i_blocks);
+
obd_cancel(sbi->ll_dt_exp, lli->lli_smd, LCK_PR, &lockh);
RETURN(rc);
}
ldlm_policy_data_t *policy, struct lustre_handle *lockh,
int ast_flags, struct obd_service_time *stime)
{
+ struct ll_inode_info *lli = ll_i2info(inode);
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct timeval start;
int rc;
* when doing appending writes and effectively cancel the
* result of the truncate. Getting the i_sem after the enqueue
* maintains the DLM -> i_sem acquiry order. */
- down(&inode->i_sem);
+ down(&lli->lli_size_sem);
inode->i_size = lov_merge_size(lsm, 1);
- up(&inode->i_sem);
+ up(&lli->lli_size_sem);
+ }
+
+ if (rc == 0) {
+ LTIME_S(inode->i_mtime) =
+ lov_merge_mtime(lsm, LTIME_S(inode->i_mtime));
}
- //inode->i_mtime = lov_merge_mtime(lsm, inode->i_mtime);
RETURN(rc);
}
if (rc != 0)
RETURN(rc);
+ down(&lli->lli_size_sem);
kms = lov_merge_size(lsm, 1);
if (*ppos + count - 1 > kms) {
/* A glimpse is necessary to determine whether we return a short
* read or some zeroes at the end of the buffer */
+ up(&lli->lli_size_sem);
retval = ll_glimpse_size(inode);
if (retval)
goto out;
} else {
inode->i_size = kms;
+ up(&lli->lli_size_sem);
}
CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
struct ptlrpc_request *req = NULL;
int rc = 0;
struct lustre_md md;
+ struct obd_client_handle *och;
ENTRY;
+
+ if ((file->f_flags+1) & O_ACCMODE)
+ oit.it_flags++;
+ if (file->f_flags & O_TRUNC)
+ oit.it_flags |= 2;
+
down(&lli->lli_open_sem);
lsm = lli->lli_smd;
if (lsm) {
f->f_dentry = file->f_dentry;
f->f_vfsmnt = file->f_vfsmnt;
+ f->f_flags = flags;
rc = ll_intent_alloc(&oit);
if (rc)
GOTO(out, rc);
ll_update_inode(f->f_dentry->d_inode, &md);
- rc = ll_local_open(f, &oit);
- if (rc)
+ OBD_ALLOC(och, sizeof(struct obd_client_handle));
+ rc = ll_local_open(f, &oit, och);
+ if (rc) { /* Actually ll_local_open cannot fail! */
GOTO(out, rc);
+ }
+ if (LUSTRE_IT(&oit)->it_lock_mode) {
+ ldlm_lock_decref_and_cancel((struct lustre_handle *)
+ &LUSTRE_IT(&oit)->it_lock_handle,
+ LUSTRE_IT(&oit)->it_lock_mode);
+ LUSTRE_IT(&oit)->it_lock_mode = 0;
+ }
+
ll_intent_release(&oit);
+ /* ll_file_release will decrease the count, but won't free anything
+ because we have at least one more reference coming from actual open
+ */
+ down(&lli->lli_och_sem);
+ lli->lli_open_fd_write_count++;
+ up(&lli->lli_och_sem);
rc = ll_file_release(f->f_dentry->d_inode, f);
+
+ /* Now also destroy our supplemental och */
+ ll_md_och_close(ll_i2mdexp(inode), f->f_dentry->d_inode, och);
EXIT;
out:
ll_intent_release(&oit);
RETURN(-ENODATA);
return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, lsm,
- (void *)arg);
+ (void *)arg);
}
static int ll_get_grouplock(struct inode *inode, struct file *file,
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
if (origin == 2) { /* SEEK_END */
ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
+ struct ll_inode_info *lli = ll_i2info(inode);
int nonblock = 0, rc;
if (file->f_flags & O_NONBLOCK)
if (rc != 0)
RETURN(rc);
+ down(&lli->lli_size_sem);
offset += inode->i_size;
+ up(&lli->lli_size_sem);
} else if (origin == 1) { /* SEEK_CUR */
offset += file->f_pos;
}
{
int res = 0;
struct inode *inode = de->d_inode;
+ struct ll_inode_info *lli = ll_i2info(inode);
res = ll_inode_revalidate_it(de);
lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
stat->atime = inode->i_atime;
stat->mtime = inode->i_mtime;
stat->ctime = inode->i_ctime;
- stat->size = inode->i_size;
stat->blksize = inode->i_blksize;
+
+ down(&lli->lli_size_sem);
+ stat->size = inode->i_size;
stat->blocks = inode->i_blocks;
+ up(&lli->lli_size_sem);
+
stat->rdev = kdev_t_to_nr(inode->i_rdev);
stat->dev = id_group(&ll_i2info(inode)->lli_id);
return 0;