Whamcloud - gitweb
LU-1222 ldlm: Fix the race in AST sender vs multiple arriving RPCs
[fs/lustre-release.git] / lustre / llite / llite_close.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/llite/llite_close.c
39  *
40  * Lustre Lite routines to issue a secondary close after writeback
41  */
42
43 #include <linux/module.h>
44
45 #define DEBUG_SUBSYSTEM S_LLITE
46
47 #include <lustre_lite.h>
48 #include "llite_internal.h"
49
50 /** records that a write is in flight */
51 void vvp_write_pending(struct ccc_object *club, struct ccc_page *page)
52 {
53         struct ll_inode_info *lli = ll_i2info(club->cob_inode);
54
55         ENTRY;
56         cfs_spin_lock(&lli->lli_lock);
57         lli->lli_flags |= LLIF_SOM_DIRTY;
58         if (page != NULL && cfs_list_empty(&page->cpg_pending_linkage))
59                 cfs_list_add(&page->cpg_pending_linkage,
60                              &club->cob_pending_list);
61         cfs_spin_unlock(&lli->lli_lock);
62         EXIT;
63 }
64
65 /** records that a write has completed */
66 void vvp_write_complete(struct ccc_object *club, struct ccc_page *page)
67 {
68         struct ll_inode_info *lli = ll_i2info(club->cob_inode);
69         int rc = 0;
70
71         ENTRY;
72         cfs_spin_lock(&lli->lli_lock);
73         if (page != NULL && !cfs_list_empty(&page->cpg_pending_linkage)) {
74                 cfs_list_del_init(&page->cpg_pending_linkage);
75                 rc = 1;
76         }
77         cfs_spin_unlock(&lli->lli_lock);
78         if (rc)
79                 ll_queue_done_writing(club->cob_inode, 0);
80         EXIT;
81 }
82
83 /** Queues DONE_WRITING if
84  * - done writing is allowed;
85  * - inode has no no dirty pages; */
86 void ll_queue_done_writing(struct inode *inode, unsigned long flags)
87 {
88         struct ll_inode_info *lli = ll_i2info(inode);
89         struct ccc_object *club = cl2ccc(ll_i2info(inode)->lli_clob);
90         ENTRY;
91
92         cfs_spin_lock(&lli->lli_lock);
93         lli->lli_flags |= flags;
94
95         if ((lli->lli_flags & LLIF_DONE_WRITING) &&
96             cfs_list_empty(&club->cob_pending_list)) {
97                 struct ll_close_queue *lcq = ll_i2sbi(inode)->ll_lcq;
98
99                 if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
100                         CWARN("ino %lu/%u(flags %u) som valid it just after "
101                               "recovery\n",
102                               inode->i_ino, inode->i_generation,
103                               lli->lli_flags);
104                 /* DONE_WRITING is allowed and inode has no dirty page. */
105                 cfs_spin_lock(&lcq->lcq_lock);
106
107                 LASSERT(cfs_list_empty(&lli->lli_close_list));
108                 CDEBUG(D_INODE, "adding inode %lu/%u to close list\n",
109                        inode->i_ino, inode->i_generation);
110                 cfs_list_add_tail(&lli->lli_close_list, &lcq->lcq_head);
111
112                 /* Avoid a concurrent insertion into the close thread queue:
113                  * an inode is already in the close thread, open(), write(),
114                  * close() happen, epoch is closed as the inode is marked as
115                  * LLIF_EPOCH_PENDING. When pages are written inode should not
116                  * be inserted into the queue again, clear this flag to avoid
117                  * it. */
118                 lli->lli_flags &= ~LLIF_DONE_WRITING;
119
120                 cfs_waitq_signal(&lcq->lcq_waitq);
121                 cfs_spin_unlock(&lcq->lcq_lock);
122         }
123         cfs_spin_unlock(&lli->lli_lock);
124         EXIT;
125 }
126
127 /** Pack SOM attributes info @opdata for CLOSE, DONE_WRITING rpc. */
128 void ll_done_writing_attr(struct inode *inode, struct md_op_data *op_data)
129 {
130         struct ll_inode_info *lli = ll_i2info(inode);
131         ENTRY;
132
133         op_data->op_flags |= MF_SOM_CHANGE;
134         /* Check if Size-on-MDS attributes are valid. */
135         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
136                 CERROR("ino %lu/%u(flags %u) som valid it just after "
137                        "recovery\n", inode->i_ino, inode->i_generation,
138                        lli->lli_flags);
139
140         if (!cl_local_size(inode)) {
141                 /* Send Size-on-MDS Attributes if valid. */
142                 op_data->op_attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET |
143                                 ATTR_ATIME_SET | ATTR_SIZE | ATTR_BLOCKS;
144         }
145         EXIT;
146 }
147
148 /** Closes ioepoch and packs Size-on-MDS attribute if needed into @op_data. */
149 void ll_ioepoch_close(struct inode *inode, struct md_op_data *op_data,
150                       struct obd_client_handle **och, unsigned long flags)
151 {
152         struct ll_inode_info *lli = ll_i2info(inode);
153         struct ccc_object *club = cl2ccc(ll_i2info(inode)->lli_clob);
154         ENTRY;
155
156         cfs_spin_lock(&lli->lli_lock);
157         if (!(cfs_list_empty(&club->cob_pending_list))) {
158                 if (!(lli->lli_flags & LLIF_EPOCH_PENDING)) {
159                         LASSERT(*och != NULL);
160                         LASSERT(lli->lli_pending_och == NULL);
161                         /* Inode is dirty and there is no pending write done
162                          * request yet, DONE_WRITE is to be sent later. */
163                         lli->lli_flags |= LLIF_EPOCH_PENDING;
164                         lli->lli_pending_och = *och;
165                         cfs_spin_unlock(&lli->lli_lock);
166
167                         inode = igrab(inode);
168                         LASSERT(inode);
169                         GOTO(out, 0);
170                 }
171                 if (flags & LLIF_DONE_WRITING) {
172                         /* Some pages are still dirty, it is early to send
173                          * DONE_WRITE. Wait untill all pages will be flushed
174                          * and try DONE_WRITE again later. */
175                         LASSERT(!(lli->lli_flags & LLIF_DONE_WRITING));
176                         lli->lli_flags |= LLIF_DONE_WRITING;
177                         cfs_spin_unlock(&lli->lli_lock);
178
179                         inode = igrab(inode);
180                         LASSERT(inode);
181                         GOTO(out, 0);
182                 }
183         }
184         CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n",
185                ll_i2info(inode)->lli_ioepoch, PFID(&lli->lli_fid));
186         op_data->op_flags |= MF_EPOCH_CLOSE;
187
188         if (flags & LLIF_DONE_WRITING) {
189                 LASSERT(lli->lli_flags & LLIF_SOM_DIRTY);
190                 LASSERT(!(lli->lli_flags & LLIF_DONE_WRITING));
191                 *och = lli->lli_pending_och;
192                 lli->lli_pending_och = NULL;
193                 lli->lli_flags &= ~LLIF_EPOCH_PENDING;
194         } else {
195                 /* Pack Size-on-MDS inode attributes only if they has changed */
196                 if (!(lli->lli_flags & LLIF_SOM_DIRTY)) {
197                         cfs_spin_unlock(&lli->lli_lock);
198                         GOTO(out, 0);
199                 }
200
201                 /* There is a pending DONE_WRITE -- close epoch with no
202                  * attribute change. */
203                 if (lli->lli_flags & LLIF_EPOCH_PENDING) {
204                         cfs_spin_unlock(&lli->lli_lock);
205                         GOTO(out, 0);
206                 }
207         }
208
209         LASSERT(cfs_list_empty(&club->cob_pending_list));
210         lli->lli_flags &= ~LLIF_SOM_DIRTY;
211         cfs_spin_unlock(&lli->lli_lock);
212         ll_done_writing_attr(inode, op_data);
213
214         EXIT;
215 out:
216         return;
217 }
218
219 /**
220  * Cliens updates SOM attributes on MDS (including llog cookies):
221  * obd_getattr with no lock and md_setattr.
222  */
223 int ll_som_update(struct inode *inode, struct md_op_data *op_data)
224 {
225         struct ll_inode_info *lli = ll_i2info(inode);
226         struct ptlrpc_request *request = NULL;
227         __u32 old_flags;
228         struct obdo *oa;
229         int rc;
230         ENTRY;
231
232         LASSERT(op_data != NULL);
233         if (lli->lli_flags & LLIF_MDS_SIZE_LOCK)
234                 CERROR("ino %lu/%u(flags %u) som valid it just after "
235                        "recovery\n", inode->i_ino, inode->i_generation,
236                        lli->lli_flags);
237
238         OBDO_ALLOC(oa);
239         if (!oa) {
240                 CERROR("can't allocate memory for Size-on-MDS update.\n");
241                 RETURN(-ENOMEM);
242         }
243
244         old_flags = op_data->op_flags;
245         op_data->op_flags = MF_SOM_CHANGE;
246
247         /* If inode is already in another epoch, skip getattr from OSTs. */
248         if (lli->lli_ioepoch == op_data->op_ioepoch) {
249                 rc = ll_inode_getattr(inode, oa, op_data->op_ioepoch,
250                                       old_flags & MF_GETATTR_LOCK);
251                 if (rc) {
252                         oa->o_valid = 0;
253                         if (rc == -ENOENT)
254                                 CDEBUG(D_INODE, "objid "LPX64" is destroyed\n",
255                                        lli->lli_smd->lsm_object_id);
256                         else
257                                 CERROR("inode_getattr failed (%d): unable to "
258                                        "send a Size-on-MDS attribute update "
259                                        "for inode %lu/%u\n", rc, inode->i_ino,
260                                        inode->i_generation);
261                 } else {
262                         CDEBUG(D_INODE, "Size-on-MDS update on "DFID"\n",
263                                PFID(&lli->lli_fid));
264                 }
265                 /* Install attributes into op_data. */
266                 md_from_obdo(op_data, oa, oa->o_valid);
267         }
268
269         rc = md_setattr(ll_i2sbi(inode)->ll_md_exp, op_data,
270                         NULL, 0, NULL, 0, &request, NULL);
271         ptlrpc_req_finished(request);
272
273         OBDO_FREE(oa);
274         RETURN(rc);
275 }
276
277 /**
278  * Closes the ioepoch and packs all the attributes into @op_data for
279  * DONE_WRITING rpc.
280  */
281 static void ll_prepare_done_writing(struct inode *inode,
282                                     struct md_op_data *op_data,
283                                     struct obd_client_handle **och)
284 {
285         ll_ioepoch_close(inode, op_data, och, LLIF_DONE_WRITING);
286         /* If there is no @och, we do not do D_W yet. */
287         if (*och == NULL)
288                 return;
289
290         ll_pack_inode2opdata(inode, op_data, &(*och)->och_fh);
291         ll_prep_md_op_data(op_data, inode, NULL, NULL,
292                            0, 0, LUSTRE_OPC_ANY, NULL);
293 }
294
295 /** Send a DONE_WRITING rpc. */
296 static void ll_done_writing(struct inode *inode)
297 {
298         struct obd_client_handle *och = NULL;
299         struct md_op_data *op_data;
300         int rc;
301         ENTRY;
302
303         LASSERT(exp_connect_som(ll_i2mdexp(inode)));
304
305         OBD_ALLOC_PTR(op_data);
306         if (op_data == NULL) {
307                 CERROR("can't allocate op_data\n");
308                 EXIT;
309                 return;
310         }
311
312         ll_prepare_done_writing(inode, op_data, &och);
313         /* If there is no @och, we do not do D_W yet. */
314         if (och == NULL)
315                 GOTO(out, 0);
316
317         rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, NULL);
318         if (rc == -EAGAIN) {
319                 /* MDS has instructed us to obtain Size-on-MDS attribute from
320                  * OSTs and send setattr to back to MDS. */
321                 rc = ll_som_update(inode, op_data);
322         } else if (rc) {
323                 CERROR("inode %lu mdc done_writing failed: rc = %d\n",
324                        inode->i_ino, rc);
325         }
326 out:
327         ll_finish_md_op_data(op_data);
328         if (och) {
329                 md_clear_open_replay_data(ll_i2sbi(inode)->ll_md_exp, och);
330                 OBD_FREE_PTR(och);
331         }
332         EXIT;
333 }
334
335 static struct ll_inode_info *ll_close_next_lli(struct ll_close_queue *lcq)
336 {
337         struct ll_inode_info *lli = NULL;
338
339         cfs_spin_lock(&lcq->lcq_lock);
340
341         if (!cfs_list_empty(&lcq->lcq_head)) {
342                 lli = cfs_list_entry(lcq->lcq_head.next, struct ll_inode_info,
343                                      lli_close_list);
344                 cfs_list_del_init(&lli->lli_close_list);
345         } else if (cfs_atomic_read(&lcq->lcq_stop))
346                 lli = ERR_PTR(-EALREADY);
347
348         cfs_spin_unlock(&lcq->lcq_lock);
349         return lli;
350 }
351
352 static int ll_close_thread(void *arg)
353 {
354         struct ll_close_queue *lcq = arg;
355         ENTRY;
356
357         {
358                 char name[CFS_CURPROC_COMM_MAX];
359                 snprintf(name, sizeof(name) - 1, "ll_close");
360                 cfs_daemonize(name);
361         }
362
363         cfs_complete(&lcq->lcq_comp);
364
365         while (1) {
366                 struct l_wait_info lwi = { 0 };
367                 struct ll_inode_info *lli;
368                 struct inode *inode;
369
370                 l_wait_event_exclusive(lcq->lcq_waitq,
371                                        (lli = ll_close_next_lli(lcq)) != NULL,
372                                        &lwi);
373                 if (IS_ERR(lli))
374                         break;
375
376                 inode = ll_info2i(lli);
377                 CDEBUG(D_INFO, "done_writting for inode %lu/%u\n",
378                        inode->i_ino, inode->i_generation);
379                 ll_done_writing(inode);
380                 iput(inode);
381         }
382
383         CDEBUG(D_INFO, "ll_close exiting\n");
384         cfs_complete(&lcq->lcq_comp);
385         RETURN(0);
386 }
387
388 int ll_close_thread_start(struct ll_close_queue **lcq_ret)
389 {
390         struct ll_close_queue *lcq;
391         pid_t pid;
392
393         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CLOSE_THREAD))
394                 return -EINTR;
395
396         OBD_ALLOC(lcq, sizeof(*lcq));
397         if (lcq == NULL)
398                 return -ENOMEM;
399
400         cfs_spin_lock_init(&lcq->lcq_lock);
401         CFS_INIT_LIST_HEAD(&lcq->lcq_head);
402         cfs_waitq_init(&lcq->lcq_waitq);
403         cfs_init_completion(&lcq->lcq_comp);
404
405         pid = cfs_create_thread(ll_close_thread, lcq, 0);
406         if (pid < 0) {
407                 OBD_FREE(lcq, sizeof(*lcq));
408                 return pid;
409         }
410
411         cfs_wait_for_completion(&lcq->lcq_comp);
412         *lcq_ret = lcq;
413         return 0;
414 }
415
416 void ll_close_thread_shutdown(struct ll_close_queue *lcq)
417 {
418         cfs_init_completion(&lcq->lcq_comp);
419         cfs_atomic_inc(&lcq->lcq_stop);
420         cfs_waitq_signal(&lcq->lcq_waitq);
421         cfs_wait_for_completion(&lcq->lcq_comp);
422         OBD_FREE(lcq, sizeof(*lcq));
423 }