Whamcloud - gitweb
add highmem-split patch from HEAD to series
[fs/lustre-release.git] / lustre / llite / llite_close.c
index f49b21f..5200e09 100644 (file)
@@ -63,69 +63,103 @@ int llap_write_complete(struct inode *inode, struct ll_async_page *llap)
 /* Queue DONE_WRITING if 
  * - done writing is allowed;
  * - inode has no no dirty pages; */
-void ll_queue_done_writing(struct inode *inode)
+void ll_queue_done_writing(struct inode *inode, unsigned long flags)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         
         spin_lock(&lli->lli_lock);
+        lli->lli_flags |= flags;
+        
         if ((lli->lli_flags & LLIF_DONE_WRITING) &&
             list_empty(&lli->lli_pending_write_llaps)) {
                 struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
 
                 /* DONE_WRITING is allowed and inode has no dirty page. */
                 spin_lock(&lcq->lcq_lock);
-                LASSERT(list_empty(&lli->lli_close_list));
-                CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
-                       inode->i_ino, inode->i_generation);
-                
-                list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
+
+                /* 
+                 * XXX: Seems sometimes it is possible to try to add inode more
+                 * than once to close thread queue. Not sure if that is correct
+                 * from caller POV, but it looks to me logically to check this
+                 * here and just do nothing if inode is already on the queue.
+                 * Hope Vitaly will correct me if I'm wrong. --umka
+                 */
+                if (list_empty(&lli->lli_close_list)) {
+                        CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
+                               inode->i_ino, inode->i_generation);
+                        list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
+                } else {
+                        CWARN("Inode %lu/%u is already queued for done writing!\n",
+                              inode->i_ino, inode->i_generation);
+                }
                 wake_up(&lcq->lcq_waitq);
                 spin_unlock(&lcq->lcq_lock);
         }
         spin_unlock(&lli->lli_lock);
 }
 
-/* CLOSE has already occured but has not closed epoch;
- * Let let DONE_WRITING to happen. */
-void ll_init_done_writing(struct inode *inode) {
-        struct ll_inode_info *lli = ll_i2info(inode);
-        spin_lock(&lli->lli_lock);
-        if ((lli->lli_flags & LLIF_EPOCH_PENDING))
-                lli->lli_flags |= LLIF_DONE_WRITING;
-        spin_unlock(&lli->lli_lock);
-        ll_queue_done_writing(inode);
-}
-
 /* Close epoch and send Size-on-MDS attribute update if possible. 
  * Call this under @lli->lli_lock spinlock. */
-void ll_epoch_close(struct inode *inode, struct md_op_data *op_data)
+void ll_epoch_close(struct inode *inode, struct md_op_data *op_data,
+                    struct obd_client_handle **och, unsigned long flags)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         ENTRY;
 
-        CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n",
-               op_data->ioepoch, PFID(&lli->lli_fid));
-        op_data->flags |= MF_EPOCH_CLOSE;
+        spin_lock(&lli->lli_lock);
+        if (!(list_empty(&lli->lli_pending_write_llaps)) && 
+            !(lli->lli_flags & LLIF_EPOCH_PENDING)) {
+                LASSERT(*och != NULL);
+                LASSERT(lli->lli_pending_och == NULL);
+                /* Inode is dirty and there is no pending write done request
+                 * yet, DONE_WRITE is to be sent later. */
+                lli->lli_flags |= LLIF_EPOCH_PENDING;
+                lli->lli_pending_och = *och;
+                spin_unlock(&lli->lli_lock);
+                
+                inode = igrab(inode);
+                LASSERT(inode);
+                GOTO(out, 0);
+        }
 
-        /* Pack Size-on-MDS inode attributes only if they has changed */
-        if (!(lli->lli_flags & LLIF_SOM_DIRTY))
-                goto out;
-        
-        /* There is already 1 pending DONE_WRITE, do not create another one --
-         * close epoch with no attribute change. */
-        if (lli->lli_flags & LLIF_EPOCH_PENDING)
-                goto out;
+        CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n",
+               op_data->op_ioepoch, PFID(&lli->lli_fid));
+        op_data->op_flags |= MF_EPOCH_CLOSE;
+
+        if (flags & LLIF_DONE_WRITING) {
+                LASSERT(lli->lli_flags & LLIF_SOM_DIRTY);
+                *och = lli->lli_pending_och;
+                lli->lli_pending_och = NULL;
+                lli->lli_flags &= ~(LLIF_DONE_WRITING | LLIF_EPOCH_PENDING | 
+                                    LLIF_EPOCH_PENDING);
+        } else {
+                /* Pack Size-on-MDS inode attributes only if they has changed */
+                if (!(lli->lli_flags & LLIF_SOM_DIRTY)) {
+                        spin_unlock(&lli->lli_lock);
+                        GOTO(out, 0);
+                }
+
+                /* There is already 1 pending DONE_WRITE, do not create another
+                 * one -- close epoch with no attribute change. */
+                if (lli->lli_flags & LLIF_EPOCH_PENDING) {
+                        spin_unlock(&lli->lli_lock);
+                        GOTO(out, 0);
+                }
+        }
         
-        op_data->flags |= MF_SOM_CHANGE;
+        spin_unlock(&lli->lli_lock);
+        op_data->op_flags |= MF_SOM_CHANGE;
 
         /* Check if Size-on-MDS attributes are valid. */
-        if ((lli->lli_flags & LLIF_MDS_SIZE_LOCK) || !ll_local_size(inode)) {
+        LASSERT(!(lli->lli_flags & LLIF_MDS_SIZE_LOCK));
+        if (!ll_local_size(inode)) {
                 /* Send Size-on-MDS Attributes if valid. */
-                op_data->attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
+                op_data->op_attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
                                           ATTR_SIZE | ATTR_BLOCKS;
         }
-out:
         EXIT;
+out:
+        return;
 }
 
 int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh)
@@ -154,10 +188,10 @@ int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh)
         CDEBUG(D_INODE, "Size-on-MDS update on "DFID"\n", PFID(&lli->lli_fid));
         
         md_from_obdo(op_data, oa, oa->o_valid);
-        memcpy(&op_data->handle, fh, sizeof(*fh));
+        memcpy(&op_data->op_handle, fh, sizeof(*fh));
         
-        op_data->ioepoch = lli->lli_ioepoch;
-        op_data->flags |= MF_SOM_CHANGE;
+        op_data->op_ioepoch = lli->lli_ioepoch;
+        op_data->op_flags |= MF_SOM_CHANGE;
         
         rc = ll_md_setattr(inode, op_data);
         EXIT;
@@ -165,16 +199,15 @@ out:
         if (oa)
                 obdo_free(oa);
         if (op_data)
-                OBD_FREE_PTR(op_data);
+                ll_finish_md_op_data(op_data);
         return rc;
 }
 
 /* Send a DONE_WRITING rpc, pack Size-on-MDS attributes into it, if possible */
 static void ll_done_writing(struct inode *inode)
 {
-        struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_client_handle *och = NULL;
         struct md_op_data *op_data;
-        struct obd_client_handle *och;
         int rc;
         ENTRY;
 
@@ -185,21 +218,12 @@ static void ll_done_writing(struct inode *inode)
                 return;
         }
 
-        spin_lock(&lli->lli_lock);
-        LASSERT(lli->lli_flags & LLIF_SOM_DIRTY);
-        
-        och = lli->lli_pending_och;
-        lli->lli_pending_och = NULL;
-        lli->lli_flags &= ~(LLIF_DONE_WRITING | LLIF_EPOCH_PENDING);
-        ll_epoch_close(inode, op_data);
-        lli->lli_flags &= ~LLIF_SOM_DIRTY;
-        spin_unlock(&lli->lli_lock);
-        
+        ll_epoch_close(inode, op_data, &och, LLIF_DONE_WRITING);
         ll_pack_inode2opdata(inode, op_data, &och->och_fh);
 
         rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, och);
-        OBD_FREE_PTR(op_data);
-        if (rc == EAGAIN) {
+        ll_finish_md_op_data(op_data);
+        if (rc == -EAGAIN) {
                 /* MDS has instructed us to obtain Size-on-MDS attribute from 
                  * OSTs and send setattr to back to MDS. */
                 rc = ll_sizeonmds_update(inode, &och->och_fh);
@@ -222,7 +246,7 @@ static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
                                  lli_close_list);
                 list_del_init(&lli->lli_close_list);
         } else if (atomic_read(&lcq->lcq_stop))
-                lli = ERR_PTR(-1);
+                lli = ERR_PTR(-EALREADY);
 
         spin_unlock(&lcq->lcq_lock);
         return lli;
@@ -253,10 +277,13 @@ static int ll_close_thread(void *arg)
                         break;
 
                 inode = ll_info2i(lli);
+                CDEBUG(D_INFO, "done_writting for inode %lu/%u\n",
+                       inode->i_ino, inode->i_generation);
                 ll_done_writing(inode);
                 iput(inode);
         }
 
+        CDEBUG(D_INFO, "ll_close exiting\n");
         complete(&lcq->lcq_comp);
         RETURN(0);
 }