Whamcloud - gitweb
- b_size_on_mds landed on HEAD:
[fs/lustre-release.git] / lustre / llite / llite_close.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Lustre Lite routines to issue a secondary close after writeback
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <linux/module.h>
25
26 #define DEBUG_SUBSYSTEM S_LLITE
27
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include "llite_internal.h"
31
32 /* record that a write is in flight */
33 void llap_write_pending(struct inode *inode, struct ll_async_page *llap)
34 {
35         struct ll_inode_info *lli = ll_i2info(inode);
36         struct page *page = llap->llap_page;
37         spin_lock(&lli->lli_lock);
38         CDEBUG(D_INODE, "track page 0x%p/%lu %s\n",
39                page, (unsigned long) page->index,
40                !list_empty(&llap->llap_pending_write) ? "(already)" : "");
41         if (list_empty(&llap->llap_pending_write))
42                 list_add(&llap->llap_pending_write,
43                          &lli->lli_pending_write_llaps);
44         spin_unlock(&lli->lli_lock);
45 }
46
47 /* record that a write has completed */
48 void llap_write_complete(struct inode *inode, struct ll_async_page *llap)
49 {
50         struct ll_inode_info *lli = ll_i2info(inode);
51         spin_lock(&lli->lli_lock);
52         if (!list_empty(&llap->llap_pending_write))
53                 list_del_init(&llap->llap_pending_write);
54         spin_unlock(&lli->lli_lock);
55 }
56
57 void ll_open_complete(struct inode *inode)
58 {
59         struct ll_inode_info *lli = ll_i2info(inode);
60         spin_lock(&lli->lli_lock);
61         lli->lli_send_done_writing = 0;
62         spin_unlock(&lli->lli_lock);
63 }
64
65 /* if we close with writes in flight then we want the completion or cancelation
66  * of those writes to send a DONE_WRITING rpc to the MDS */
67 int ll_is_inode_dirty(struct inode *inode)
68 {
69         struct ll_inode_info *lli = ll_i2info(inode);
70         int rc = 0;
71         ENTRY;
72
73         spin_lock(&lli->lli_lock);
74         if (!list_empty(&lli->lli_pending_write_llaps))
75                 rc = 1;
76         spin_unlock(&lli->lli_lock);
77         RETURN(rc);
78 }
79
80 void ll_try_done_writing(struct inode *inode)
81 {
82         struct ll_inode_info *lli = ll_i2info(inode);
83         struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
84         int added = 0;
85
86         spin_lock(&lli->lli_lock);
87
88         if (lli->lli_send_done_writing &&
89             list_empty(&lli->lli_pending_write_llaps)) {
90                 spin_lock(&lcq->lcq_lock);
91                 if (list_empty(&lli->lli_close_item)) {
92                         CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
93                                inode->i_ino, inode->i_generation);
94                         list_add_tail(&lli->lli_close_item, &lcq->lcq_list);
95                         wake_up(&lcq->lcq_waitq);
96                         added = 1;
97                 }
98                 spin_unlock(&lcq->lcq_lock);
99         }
100
101         spin_unlock(&lli->lli_lock);
102        
103         /* 
104          * we can't grab inode under lli_lock, because:
105          * ll_try_done_writing:                 ll_prep_inode:
106          *   spin_lock(&lli_lock)                 spin_lock(&inode_lock)
107          *     igrab()                              ll_update_inode()
108          *       spin_lock(&inode_lock)               spin_lock(&lli_lock)
109          */
110         if (added)
111                 LASSERT(igrab(inode) == inode);
112 }
113
114 /* The MDS needs us to get the real file attributes, then send a DONE_WRITING */
115 void ll_queue_done_writing(struct inode *inode)
116 {
117         struct ll_inode_info *lli = ll_i2info(inode);
118         ENTRY;
119
120         CDEBUG(D_INODE, "queue closing for %lu/%u\n",
121                inode->i_ino, inode->i_generation);
122         spin_lock(&lli->lli_lock);
123         lli->lli_send_done_writing = 1;
124         spin_unlock(&lli->lli_lock);
125
126         ll_try_done_writing(inode);
127         EXIT;
128 }
129
130 /* If we know the file size and have the cookies:
131  *  - send a DONE_WRITING rpc
132  *
133  * Otherwise:
134  *  - get a whole-file lock
135  *  - get the authoritative size and all cookies with GETATTRs
136  *  - send a DONE_WRITING rpc
137  */
138 static void ll_try_to_close(struct inode *inode)
139 {
140         struct ll_sb_info *sbi = ll_i2sbi(inode);
141         ll_md_real_close(sbi->ll_md_exp, inode, FMODE_WRITE | FMODE_SYNC);
142 }
143
144 static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
145 {
146         struct ll_inode_info *lli = NULL;
147
148         spin_lock(&lcq->lcq_lock);
149
150         if (lcq->lcq_list.next == NULL)
151                 lli = ERR_PTR(-1);
152         else if (!list_empty(&lcq->lcq_list)) {
153                 lli = list_entry(lcq->lcq_list.next, struct ll_inode_info,
154                                  lli_close_item);
155                 list_del_init(&lli->lli_close_item);
156         }
157
158         spin_unlock(&lcq->lcq_lock);
159         return lli;
160 }
161
162 static int ll_close_thread(void *arg)
163 {
164         struct ll_close_queue *lcq = arg;
165         ENTRY;
166
167         /* XXX boiler-plate */
168         {
169                 char name[sizeof(current->comm)];
170                 unsigned long flags;
171                 snprintf(name, sizeof(name) - 1, "ll_close");
172                 kportal_daemonize(name);
173                 SIGNAL_MASK_LOCK(current, flags);
174                 sigfillset(&current->blocked);
175                 RECALC_SIGPENDING;
176                 SIGNAL_MASK_UNLOCK(current, flags);
177         }
178
179         complete(&lcq->lcq_comp);
180
181         while (1) {
182                 struct l_wait_info lwi = { 0 };
183                 struct ll_inode_info *lli;
184                 struct inode *inode;
185
186                 l_wait_event_exclusive(lcq->lcq_waitq,
187                                        (lli = ll_close_next_lli(lcq)) != NULL,
188                                        &lwi);
189                 if (IS_ERR(lli))
190                         break;
191
192                 inode = ll_info2i(lli);
193                 ll_try_to_close(inode);
194                 iput(inode);
195         }
196
197         complete(&lcq->lcq_comp);
198         RETURN(0);
199 }
200
201 int ll_close_thread_start(struct ll_close_queue **lcq_ret)
202 {
203         struct ll_close_queue *lcq;
204         pid_t pid;
205
206         OBD_ALLOC(lcq, sizeof(*lcq));
207         if (lcq == NULL)
208                 return -ENOMEM;
209
210         spin_lock_init(&lcq->lcq_lock);
211         INIT_LIST_HEAD(&lcq->lcq_list);
212         init_waitqueue_head(&lcq->lcq_waitq);
213         init_completion(&lcq->lcq_comp);
214
215         pid = kernel_thread(ll_close_thread, lcq, 0);
216         if (pid < 0) {
217                 OBD_FREE(lcq, sizeof(*lcq));
218                 return pid;
219         }
220
221         wait_for_completion(&lcq->lcq_comp);
222         *lcq_ret = lcq;
223         return 0;
224 }
225
226 void ll_close_thread_shutdown(struct ll_close_queue *lcq)
227 {
228         init_completion(&lcq->lcq_comp);
229         lcq->lcq_list.next = NULL;
230         wake_up(&lcq->lcq_waitq);
231         wait_for_completion(&lcq->lcq_comp);
232         OBD_FREE(lcq, sizeof(*lcq));
233 }