/* Queue DONE_WRITING if
* - done writing is allowed;
* - inode has no no dirty pages; */
-void ll_queue_done_writing(struct inode *inode)
+void ll_queue_done_writing(struct inode *inode, unsigned long flags)
{
struct ll_inode_info *lli = ll_i2info(inode);
spin_lock(&lli->lli_lock);
+ lli->lli_flags |= flags;
+
if ((lli->lli_flags & LLIF_DONE_WRITING) &&
list_empty(&lli->lli_pending_write_llaps)) {
struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
/* DONE_WRITING is allowed and inode has no dirty page. */
spin_lock(&lcq->lcq_lock);
- LASSERT(list_empty(&lli->lli_close_list));
- CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
- inode->i_ino, inode->i_generation);
-
- list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
+
+ /*
+ * XXX: Seems sometimes it is possible to try to add inode more
+ * than once to close thread queue. Not sure if that is correct
+ * from caller POV, but it looks to me logically to check this
+ * here and just do nothing if inode is already on the queue.
+ * Hope Vitaly will correct me if I'm wrong. --umka
+ */
+ if (list_empty(&lli->lli_close_list)) {
+ CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
+ inode->i_ino, inode->i_generation);
+ list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
+ } else {
+ CWARN("Inode %lu/%u is already queued for done writing!\n",
+ inode->i_ino, inode->i_generation);
+ }
wake_up(&lcq->lcq_waitq);
spin_unlock(&lcq->lcq_lock);
}
spin_unlock(&lli->lli_lock);
}
-/* CLOSE has already occured but has not closed epoch;
- * Let let DONE_WRITING to happen. */
-void ll_init_done_writing(struct inode *inode) {
- struct ll_inode_info *lli = ll_i2info(inode);
- spin_lock(&lli->lli_lock);
- if ((lli->lli_flags & LLIF_EPOCH_PENDING))
- lli->lli_flags |= LLIF_DONE_WRITING;
- spin_unlock(&lli->lli_lock);
- ll_queue_done_writing(inode);
-}
-
/* Close epoch and send Size-on-MDS attribute update if possible.
* Call this under @lli->lli_lock spinlock. */
-void ll_epoch_close(struct inode *inode, struct md_op_data *op_data)
+void ll_epoch_close(struct inode *inode, struct md_op_data *op_data,
+ struct obd_client_handle **och, unsigned long flags)
{
struct ll_inode_info *lli = ll_i2info(inode);
ENTRY;
- CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n",
- op_data->ioepoch, PFID(&lli->lli_fid));
- op_data->flags |= MF_EPOCH_CLOSE;
+ spin_lock(&lli->lli_lock);
+ if (!(list_empty(&lli->lli_pending_write_llaps)) &&
+ !(lli->lli_flags & LLIF_EPOCH_PENDING)) {
+ LASSERT(*och != NULL);
+ LASSERT(lli->lli_pending_och == NULL);
+ /* Inode is dirty and there is no pending write done request
+ * yet, DONE_WRITE is to be sent later. */
+ lli->lli_flags |= LLIF_EPOCH_PENDING;
+ lli->lli_pending_och = *och;
+ spin_unlock(&lli->lli_lock);
+
+ inode = igrab(inode);
+ LASSERT(inode);
+ GOTO(out, 0);
+ }
- /* Pack Size-on-MDS inode attributes only if they has changed */
- if (!(lli->lli_flags & LLIF_SOM_DIRTY))
- goto out;
-
- /* There is already 1 pending DONE_WRITE, do not create another one --
- * close epoch with no attribute change. */
- if (lli->lli_flags & LLIF_EPOCH_PENDING)
- goto out;
+ CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n",
+ op_data->op_ioepoch, PFID(&lli->lli_fid));
+ op_data->op_flags |= MF_EPOCH_CLOSE;
+
+ if (flags & LLIF_DONE_WRITING) {
+ LASSERT(lli->lli_flags & LLIF_SOM_DIRTY);
+ *och = lli->lli_pending_och;
+ lli->lli_pending_och = NULL;
+ lli->lli_flags &= ~(LLIF_DONE_WRITING | LLIF_EPOCH_PENDING |
+ LLIF_EPOCH_PENDING);
+ } else {
+ /* Pack Size-on-MDS inode attributes only if they has changed */
+ if (!(lli->lli_flags & LLIF_SOM_DIRTY)) {
+ spin_unlock(&lli->lli_lock);
+ GOTO(out, 0);
+ }
+
+ /* There is already 1 pending DONE_WRITE, do not create another
+ * one -- close epoch with no attribute change. */
+ if (lli->lli_flags & LLIF_EPOCH_PENDING) {
+ spin_unlock(&lli->lli_lock);
+ GOTO(out, 0);
+ }
+ }
- op_data->flags |= MF_SOM_CHANGE;
+ spin_unlock(&lli->lli_lock);
+ op_data->op_flags |= MF_SOM_CHANGE;
/* Check if Size-on-MDS attributes are valid. */
- if ((lli->lli_flags & LLIF_MDS_SIZE_LOCK) || !ll_local_size(inode)) {
+ LASSERT(!(lli->lli_flags & LLIF_MDS_SIZE_LOCK));
+ if (!ll_local_size(inode)) {
/* Send Size-on-MDS Attributes if valid. */
- op_data->attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
+ op_data->op_attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
ATTR_SIZE | ATTR_BLOCKS;
}
-out:
EXIT;
+out:
+ return;
}
int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh)
CDEBUG(D_INODE, "Size-on-MDS update on "DFID"\n", PFID(&lli->lli_fid));
md_from_obdo(op_data, oa, oa->o_valid);
- memcpy(&op_data->handle, fh, sizeof(*fh));
+ memcpy(&op_data->op_handle, fh, sizeof(*fh));
- op_data->ioepoch = lli->lli_ioepoch;
- op_data->flags |= MF_SOM_CHANGE;
+ op_data->op_ioepoch = lli->lli_ioepoch;
+ op_data->op_flags |= MF_SOM_CHANGE;
rc = ll_md_setattr(inode, op_data);
EXIT;
if (oa)
obdo_free(oa);
if (op_data)
- OBD_FREE_PTR(op_data);
+ ll_finish_md_op_data(op_data);
return rc;
}
/* Send a DONE_WRITING rpc, pack Size-on-MDS attributes into it, if possible */
static void ll_done_writing(struct inode *inode)
{
- struct ll_inode_info *lli = ll_i2info(inode);
+ struct obd_client_handle *och = NULL;
struct md_op_data *op_data;
- struct obd_client_handle *och;
int rc;
ENTRY;
return;
}
- spin_lock(&lli->lli_lock);
- LASSERT(lli->lli_flags & LLIF_SOM_DIRTY);
-
- och = lli->lli_pending_och;
- lli->lli_pending_och = NULL;
- lli->lli_flags &= ~(LLIF_DONE_WRITING | LLIF_EPOCH_PENDING);
- ll_epoch_close(inode, op_data);
- lli->lli_flags &= ~LLIF_SOM_DIRTY;
- spin_unlock(&lli->lli_lock);
-
+ ll_epoch_close(inode, op_data, &och, LLIF_DONE_WRITING);
ll_pack_inode2opdata(inode, op_data, &och->och_fh);
rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, och);
- OBD_FREE_PTR(op_data);
- if (rc == EAGAIN) {
+ ll_finish_md_op_data(op_data);
+ if (rc == -EAGAIN) {
/* MDS has instructed us to obtain Size-on-MDS attribute from
* OSTs and send setattr to back to MDS. */
rc = ll_sizeonmds_update(inode, &och->och_fh);
lli_close_list);
list_del_init(&lli->lli_close_list);
} else if (atomic_read(&lcq->lcq_stop))
- lli = ERR_PTR(-1);
+ lli = ERR_PTR(-EALREADY);
spin_unlock(&lcq->lcq_lock);
return lli;
break;
inode = ll_info2i(lli);
+ CDEBUG(D_INFO, "done_writting for inode %lu/%u\n",
+ inode->i_ino, inode->i_generation);
ll_done_writing(inode);
iput(inode);
}
+ CDEBUG(D_INFO, "ll_close exiting\n");
complete(&lcq->lcq_comp);
RETURN(0);
}