Whamcloud - gitweb
954f448ac577c4fc5e792fe7fd65e47d1b875d5c
[fs/lustre-release.git] / lustre / llite / llite_close.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Lite routines to issue a secondary close after writeback
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <linux/module.h>
25
26 #define DEBUG_SUBSYSTEM S_LLITE
27
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include <linux/lustre_gs.h>
31 #include "llite_internal.h"
32
33 /* record that a write is in flight */
34 void llap_write_pending(struct inode *inode, struct ll_async_page *llap)
35 {
36         struct ll_inode_info *lli = ll_i2info(inode);
37         struct page *page = llap->llap_page;
38         spin_lock(&lli->lli_lock);
39         CDEBUG(D_INODE, "track page 0x%p/%lu %s\n",
40                page, (unsigned long) page->index,
41                !list_empty(&llap->llap_pending_write) ? "(already)" : "");
42         if (list_empty(&llap->llap_pending_write))
43                 list_add(&llap->llap_pending_write,
44                          &lli->lli_pending_write_llaps);
45         spin_unlock(&lli->lli_lock);
46 }
47
48 /* record that a write has completed */
49 void llap_write_complete(struct inode *inode, struct ll_async_page *llap)
50 {
51         struct ll_inode_info *lli = ll_i2info(inode);
52         spin_lock(&lli->lli_lock);
53         if (!list_empty(&llap->llap_pending_write))
54                 list_del_init(&llap->llap_pending_write);
55         if (list_empty(&lli->lli_pending_write_llaps))
56                 wake_up(&lli->lli_dirty_wait);
57         spin_unlock(&lli->lli_lock);
58 }
59
60 void ll_open_complete(struct inode *inode)
61 {
62         struct ll_inode_info *lli = ll_i2info(inode);
63         spin_lock(&lli->lli_lock);
64         lli->lli_send_done_writing = 0;
65         spin_unlock(&lli->lli_lock);
66 }
67
68 /* if we close with writes in flight then we want the completion or cancelation
69  * of those writes to send a DONE_WRITING rpc to the MDS */
70 int ll_is_inode_dirty(struct inode *inode)
71 {
72         struct ll_inode_info *lli = ll_i2info(inode);
73         int rc = 0;
74         ENTRY;
75
76         spin_lock(&lli->lli_lock);
77         if (!list_empty(&lli->lli_pending_write_llaps))
78                 rc = 1;
79         spin_unlock(&lli->lli_lock);
80         RETURN(rc);
81 }
82
83 void ll_try_done_writing(struct inode *inode)
84 {
85         struct ll_inode_info *lli = ll_i2info(inode);
86         struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
87         int added = 0;
88
89         spin_lock(&lli->lli_lock);
90
91         if (lli->lli_send_done_writing &&
92             list_empty(&lli->lli_pending_write_llaps)) {
93                 spin_lock(&lcq->lcq_lock);
94                 if (list_empty(&lli->lli_close_item)) {
95                         CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
96                                inode->i_ino, inode->i_generation);
97                         list_add_tail(&lli->lli_close_item, &lcq->lcq_list);
98                         wake_up(&lcq->lcq_waitq);
99                         added = 1;
100                 }
101                 spin_unlock(&lcq->lcq_lock);
102         }
103
104         spin_unlock(&lli->lli_lock);
105        
106         /* 
107          * we can't grab inode under lli_lock, because:
108          * ll_try_done_writing:                 ll_prep_inode:
109          *   spin_lock(&lli_lock)                 spin_lock(&inode_lock)
110          *     igrab()                              ll_update_inode()
111          *       spin_lock(&inode_lock)               spin_lock(&lli_lock)
112          */
113         if (added)
114                 LASSERT(igrab(inode) == inode);
115 }
116
117 /* The MDS needs us to get the real file attributes, then send a DONE_WRITING */
118 void ll_queue_done_writing(struct inode *inode)
119 {
120         struct ll_inode_info *lli = ll_i2info(inode);
121         ENTRY;
122
123         CDEBUG(D_INODE, "queue closing for %lu/%u\n",
124                inode->i_ino, inode->i_generation);
125         spin_lock(&lli->lli_lock);
126         lli->lli_send_done_writing = 1;
127         spin_unlock(&lli->lli_lock);
128
129         ll_try_done_writing(inode);
130         EXIT;
131 }
132
133 /* If we know the file size and have the cookies:
134  *  - send a DONE_WRITING rpc
135  *
136  * Otherwise:
137  *  - get a whole-file lock
138  *  - get the authoritative size and all cookies with GETATTRs
139  *  - send a DONE_WRITING rpc
140  */
141 static void ll_try_to_close(struct inode *inode)
142 {
143         struct ll_sb_info *sbi = ll_i2sbi(inode);
144         ll_md_real_close(sbi->ll_md_exp, inode, FMODE_WRITE | FMODE_SYNC);
145 }
146
147 static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
148 {
149         struct ll_inode_info *lli = NULL;
150
151         spin_lock(&lcq->lcq_lock);
152
153         /* first, check for queued request. otherwise, we would
154          * leak them upon umount */
155         if (!list_empty(&lcq->lcq_list)) {
156                 lli = list_entry(lcq->lcq_list.next, struct ll_inode_info,
157                                  lli_close_item);
158                 list_del_init(&lli->lli_close_item);
159         } else if (lcq->lcq_stop != 0) {
160                 lli = ERR_PTR(-1);
161         }
162
163         spin_unlock(&lcq->lcq_lock);
164         return lli;
165 }
166
167 static int ll_close_thread(void *arg)
168 {
169         struct ll_close_queue *lcq = arg;
170         ENTRY;
171
172         /* XXX boiler-plate */
173         {
174                 char name[sizeof(current->comm)];
175                 unsigned long flags;
176                 snprintf(name, sizeof(name) - 1, "ll_close");
177                 kportal_daemonize(name);
178                 SIGNAL_MASK_LOCK(current, flags);
179                 sigfillset(&current->blocked);
180                 RECALC_SIGPENDING;
181                 SIGNAL_MASK_UNLOCK(current, flags);
182         }
183
184         complete(&lcq->lcq_comp);
185
186         while (1) {
187                 struct l_wait_info lwi = { 0 };
188                 struct ll_inode_info *lli;
189                 struct inode *inode;
190
191                 l_wait_event_exclusive(lcq->lcq_waitq,
192                                        (lli = ll_close_next_lli(lcq)) != NULL,
193                                        &lwi);
194                 if (IS_ERR(lli))
195                         break;
196
197                 inode = ll_info2i(lli);
198                 ll_try_to_close(inode);
199                 iput(inode);
200         }
201
202         EXIT;
203
204         /* SMF-safe way to finish threads */
205         complete_and_exit(&lcq->lcq_comp, 0);
206 }
207
208 int ll_close_thread_start(struct ll_close_queue **lcq_ret)
209 {
210         struct ll_close_queue *lcq;
211         pid_t pid;
212
213         OBD_ALLOC(lcq, sizeof(*lcq));
214         if (lcq == NULL)
215                 return -ENOMEM;
216
217         lcq->lcq_stop = 0;
218         spin_lock_init(&lcq->lcq_lock);
219         INIT_LIST_HEAD(&lcq->lcq_list);
220         init_waitqueue_head(&lcq->lcq_waitq);
221         init_completion(&lcq->lcq_comp);
222
223         pid = kernel_thread(ll_close_thread, lcq, 0);
224         if (pid < 0) {
225                 OBD_FREE(lcq, sizeof(*lcq));
226                 return pid;
227         }
228
229         wait_for_completion(&lcq->lcq_comp);
230         *lcq_ret = lcq;
231         return 0;
232 }
233
234 void ll_close_thread_stop(struct ll_close_queue *lcq)
235 {
236         init_completion(&lcq->lcq_comp);
237         lcq->lcq_stop = 1;
238         wake_up(&lcq->lcq_waitq);
239         wait_for_completion(&lcq->lcq_comp);
240         OBD_FREE(lcq, sizeof(*lcq));
241 }