Whamcloud - gitweb
b=22786 add changelog entry
[fs/lustre-release.git] / lustre / llite / llite_close.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/llite/llite_close.c
37  *
38  * Lustre Lite routines to issue a secondary close after writeback
39  */
40
41 #include <linux/module.h>
42
43 #define DEBUG_SUBSYSTEM S_LLITE
44
45 #include <lustre_lite.h>
46 #include "llite_internal.h"
47
48 #ifdef HAVE_CLOSE_THREAD
49 /* record that a write is in flight */
50 void llap_write_pending(struct inode *inode, struct ll_async_page *llap)
51 {
52         struct ll_inode_info *lli = ll_i2info(inode);
53         spin_lock(&lli->lli_lock);
54         list_add(&llap->llap_pending_write, &lli->lli_pending_write_llaps);
55         spin_unlock(&lli->lli_lock);
56 }
57
58 /* record that a write has completed */
59 void llap_write_complete(struct inode *inode, struct ll_async_page *llap)
60 {
61         struct ll_inode_info *lli = ll_i2info(inode);
62         spin_lock(&lli->lli_lock);
63         list_del_init(&llap->llap_pending_write);
64         spin_unlock(&lli->lli_lock);
65 }
66
67 void ll_open_complete(struct inode *inode)
68 {
69         struct ll_inode_info *lli = ll_i2info(inode);
70         spin_lock(&lli->lli_lock);
71         lli->lli_send_done_writing = 0;
72         spin_unlock(&lli->lli_lock);
73 }
74
75 /* if we close with writes in flight then we want the completion or cancelation
76  * of those writes to send a DONE_WRITING rpc to the MDS */
77 int ll_is_inode_dirty(struct inode *inode)
78 {
79         struct ll_inode_info *lli = ll_i2info(inode);
80         int rc = 0;
81         ENTRY;
82
83         spin_lock(&lli->lli_lock);
84         if (!list_empty(&lli->lli_pending_write_llaps))
85                 rc = 1;
86         spin_unlock(&lli->lli_lock);
87         RETURN(rc);
88 }
89
90 void ll_try_done_writing(struct inode *inode)
91 {
92         struct ll_inode_info *lli = ll_i2info(inode);
93         struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
94
95         spin_lock(&lli->lli_lock);
96
97         if (lli->lli_send_done_writing &&
98             list_empty(&lli->lli_pending_write_llaps)) {
99
100                 spin_lock(&lcq->lcq_lock);
101                 if (list_empty(&lli->lli_close_item)) {
102                         CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
103                                inode->i_ino, inode->i_generation);
104                         igrab(inode);
105                         list_add_tail(&lli->lli_close_item, &lcq->lcq_list);
106                         wake_up(&lcq->lcq_waitq);
107                 }
108                 spin_unlock(&lcq->lcq_lock);
109         }
110
111         spin_unlock(&lli->lli_lock);
112 }
113
114 /* The MDS needs us to get the real file attributes, then send a DONE_WRITING */
115 void ll_queue_done_writing(struct inode *inode)
116 {
117         struct ll_inode_info *lli = ll_i2info(inode);
118         ENTRY;
119
120         spin_lock(&lli->lli_lock);
121         lli->lli_send_done_writing = 1;
122         spin_unlock(&lli->lli_lock);
123
124         ll_try_done_writing(inode);
125         EXIT;
126 }
127
128 /* If we know the file size and have the cookies:
129  *  - send a DONE_WRITING rpc
130  *
131  * Otherwise:
132  *  - get a whole-file lock
133  *  - get the authoritative size and all cookies with GETATTRs
134  *  - send a DONE_WRITING rpc
135  */
136 static void ll_close_done_writing(struct inode *inode)
137 {
138         struct ll_inode_info *lli = ll_i2info(inode);
139         ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF } };
140         struct lustre_handle lockh = { 0 };
141         struct obdo obdo = { 0 };
142         struct mdc_op_data data = { { 0 } };
143         obd_flag valid;
144         int rc, ast_flags = 0;
145         ENTRY;
146
147         memset(&obdo, 0, sizeof(obdo));
148         if (test_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags))
149                 goto rpc;
150
151         rc = ll_extent_lock(NULL, inode, lli->lli_smd, LCK_PW, &policy, &lockh,
152                             ast_flags);
153         if (rc != 0) {
154                 CERROR("lock acquisition failed (%d): unable to send "
155                        "DONE_WRITING for inode %lu/%u\n", rc, inode->i_ino,
156                        inode->i_generation);
157                 GOTO(out, rc);
158         }
159
160         rc = ll_lsm_getattr(ll_i2obdexp(inode), lli->lli_smd, &obdo);
161         if (rc) {
162                 CERROR("inode_getattr failed (%d): unable to send DONE_WRITING "
163                        "for inode %lu/%u\n", rc, inode->i_ino,
164                        inode->i_generation);
165                 ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
166                 GOTO(out, rc);
167         }
168
169         obdo_refresh_inode(inode, &obdo, valid);
170
171         CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %lu, blksize %lu\n",
172                lli->lli_smd->lsm_object_id, i_size_read(inode), inode->i_blocks,
173                1<<inode->i_blkbits);
174
175         set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
176
177         rc = ll_extent_unlock(NULL, inode, lli->lli_smd, LCK_PW, &lockh);
178         if (rc != ELDLM_OK)
179                 CERROR("unlock failed (%d)?  proceeding anyways...\n", rc);
180
181  rpc:
182         obdo.o_id = inode->i_ino;
183         obdo.o_size = i_size_read(inode);
184         obdo.o_blocks = inode->i_blocks;
185         obdo.o_valid = OBD_MD_FLID | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
186
187         ll_inode2fid(&data.fid1, inode);
188         rc = mdc_done_writing(ll_i2sbi(inode)->ll_mdc_exp, &data, &obdo);
189  out:
190 }
191
192
193 static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
194 {
195         struct ll_inode_info *lli = NULL;
196
197         spin_lock(&lcq->lcq_lock);
198
199         if (lcq->lcq_list.next == NULL)
200                 lli = ERR_PTR(-1);
201         else if (!list_empty(&lcq->lcq_list)) {
202                 lli = list_entry(lcq->lcq_list.next, struct ll_inode_info,
203                                  lli_close_item);
204                 list_del(&lli->lli_close_item);
205         }
206
207         spin_unlock(&lcq->lcq_lock);
208         return lli;
209 }
210 #else
211 static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
212 {
213         if (lcq->lcq_list.next == NULL)
214                 return ERR_PTR(-1);
215
216         return NULL;
217 }
218 #endif
219
220 static int ll_close_thread(void *arg)
221 {
222         struct ll_close_queue *lcq = arg;
223         ENTRY;
224
225         {
226                 char name[CFS_CURPROC_COMM_MAX];
227                 snprintf(name, sizeof(name) - 1, "ll_close");
228                 cfs_daemonize(name);
229         }
230         
231         complete(&lcq->lcq_comp);
232
233         while (1) {
234                 struct l_wait_info lwi = { 0 };
235                 struct ll_inode_info *lli;
236                 //struct inode *inode;
237
238                 l_wait_event_exclusive(lcq->lcq_waitq,
239                                        (lli = ll_close_next_lli(lcq)) != NULL,
240                                        &lwi);
241                 if (IS_ERR(lli))
242                         break;
243
244                 //inode = ll_info2i(lli);
245                 //ll_close_done_writing(inode);
246                 //iput(inode);
247         }
248
249         complete(&lcq->lcq_comp);
250         RETURN(0);
251 }
252
253 int ll_close_thread_start(struct ll_close_queue **lcq_ret)
254 {
255         struct ll_close_queue *lcq;
256         pid_t pid;
257
258         OBD_FAIL_RETURN(OBD_FAIL_LDLM_CLOSE_THREAD, -EINTR);
259         OBD_ALLOC(lcq, sizeof(*lcq));
260         if (lcq == NULL)
261                 return -ENOMEM;
262
263         spin_lock_init(&lcq->lcq_lock);
264         INIT_LIST_HEAD(&lcq->lcq_list);
265         init_waitqueue_head(&lcq->lcq_waitq);
266         init_completion(&lcq->lcq_comp);
267
268         pid = kernel_thread(ll_close_thread, lcq, 0);
269         if (pid < 0) {
270                 OBD_FREE(lcq, sizeof(*lcq));
271                 return pid;
272         }
273
274         wait_for_completion(&lcq->lcq_comp);
275         *lcq_ret = lcq;
276         return 0;
277 }
278
279 void ll_close_thread_shutdown(struct ll_close_queue *lcq)
280 {
281         init_completion(&lcq->lcq_comp);
282         lcq->lcq_list.next = NULL;
283         wake_up(&lcq->lcq_waitq);
284         wait_for_completion(&lcq->lcq_comp);
285         OBD_FREE(lcq, sizeof(*lcq));
286 }