Whamcloud - gitweb
9da030055d5d82bdf3fa3cf1f6c498cc47bcface
[fs/lustre-release.git] / lustre / ofd / ofd_lvb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014 Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_lvb.c
33  *
34  * This file contains methods for OBD Filter Device (OFD)
35  * Lock Value Block (LVB) operations.
36  *
37  * LVB is special opaque (to LDLM) data that is associated with an LDLM lock
38  * and transferred from client to server and back. OFD LVBs are used to
39  * maintain current object size/times.
40  *
41  * Author: Andreas Dilger <andreas.dilger@intel.com>
42  * Author: Mikhail Pershin <mike.pershin@intel.com>
43  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_FILTER
47
48 #include "ofd_internal.h"
49
50 /**
51  * Implementation of ldlm_valblock_ops::lvbo_free for OFD.
52  *
53  * This function frees allocated LVB data if it associated with the given
54  * LDLM resource.
55  *
56  * \param[in] res       LDLM resource
57  *
58  * \retval              0 on successful setup
59  * \retval              negative value on error
60  */
61 static int ofd_lvbo_free(struct ldlm_resource *res)
62 {
63         if (res->lr_lvb_data)
64                 OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
65
66         return 0;
67 }
68
69 /**
70  * Implementation of ldlm_valblock_ops::lvbo_init for OFD.
71  *
72  * This function allocates and initializes new LVB data for the given
73  * LDLM resource if it is not allocated yet. New LVB is filled with attributes
74  * of the object associated with that resource. Function does nothing if LVB
75  * for the given LDLM resource is allocated already.
76  *
77  * Called with res->lr_lvb_sem held.
78  *
79  * \param[in] res       LDLM resource
80  *
81  * \retval              0 on successful setup
82  * \retval              negative value on error
83  */
84 static int ofd_lvbo_init(struct ldlm_resource *res)
85 {
86         struct ost_lvb          *lvb;
87         struct ofd_device       *ofd;
88         struct ofd_object       *fo;
89         struct ofd_thread_info  *info;
90         struct lu_env            env;
91         int                      rc = 0;
92
93         ENTRY;
94
95         LASSERT(res);
96         LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
97
98         if (res->lr_lvb_data != NULL)
99                 RETURN(0);
100
101         ofd = ldlm_res_to_ns(res)->ns_lvbp;
102         LASSERT(ofd != NULL);
103
104         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
105                 RETURN(-ENOMEM);
106
107         rc = lu_env_init(&env, LCT_DT_THREAD);
108         if (rc)
109                 RETURN(rc);
110
111         OBD_ALLOC_PTR(lvb);
112         if (lvb == NULL)
113                 GOTO(out_env, rc = -ENOMEM);
114
115         res->lr_lvb_data = lvb;
116         res->lr_lvb_len = sizeof(*lvb);
117
118         info = ofd_info_init(&env, NULL);
119         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
120                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
121         fo = ofd_object_find(&env, ofd, &info->fti_fid);
122         if (IS_ERR(fo))
123                 GOTO(out_lvb, rc = PTR_ERR(fo));
124
125         rc = ofd_attr_get(&env, fo, &info->fti_attr);
126         if (rc)
127                 GOTO(out_obj, rc);
128
129         lvb->lvb_size = info->fti_attr.la_size;
130         lvb->lvb_blocks = info->fti_attr.la_blocks;
131         lvb->lvb_mtime = info->fti_attr.la_mtime;
132         lvb->lvb_atime = info->fti_attr.la_atime;
133         lvb->lvb_ctime = info->fti_attr.la_ctime;
134
135         CDEBUG(D_DLMTRACE, "res: "DFID" initial lvb size: "LPU64", "
136                "mtime: "LPX64", blocks: "LPX64"\n",
137                PFID(&info->fti_fid), lvb->lvb_size,
138                lvb->lvb_mtime, lvb->lvb_blocks);
139
140         EXIT;
141 out_obj:
142         ofd_object_put(&env, fo);
143 out_lvb:
144         if (rc != 0)
145                 OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
146 out_env:
147         lu_env_fini(&env);
148         /* Don't free lvb data on lookup error */
149         return rc;
150 }
151
152 /**
153  * Implementation of ldlm_valblock_ops::lvbo_update for OFD.
154  *
155  * When a client generates a glimpse enqueue, it wants to get the current
156  * file size and updated attributes for a stat() type operation, but these
157  * attributes may be writeback cached on another client. The client with
158  * the DLM extent lock at the highest offset is asked for its current
159  * attributes via a glimpse callback on its extent lock, on the assumption
160  * that it has the highest file size and the newest timestamps. The timestamps
161  * are guaranteed to be correct if there is only a single writer on the file,
162  * but may be slightly inaccurate if there are multiple concurrent writers on
163  * the same object. In order to avoid race conditions between the glimpse AST
164  * and the client cancelling the lock, ofd_lvbo_update() also updates
165  * the attributes from the local object. If the last client hasn't done any
166  * writes yet, or has already written its data and cancelled its lock before
167  * it processed the glimpse, then the local inode will have more uptodate
168  * information.
169  *
170  * This is called in two ways:
171  *  \a req != NULL : called by the DLM itself after a glimpse callback
172  *  \a req == NULL : called by the OFD after a disk write
173  *
174  * \param[in] res               LDLM resource
175  * \param[in] req               PTLRPC request
176  * \param[in] increase_only     don't allow LVB values to decrease
177  *
178  * \retval              0 on successful setup
179  * \retval              negative value on error
180  */
181 static int ofd_lvbo_update(struct ldlm_resource *res,
182                            struct ptlrpc_request *req, int increase_only)
183 {
184         struct ofd_device       *ofd;
185         struct ofd_object       *fo;
186         struct ofd_thread_info  *info;
187         struct ost_lvb          *lvb;
188         struct lu_env            env;
189         int                      rc = 0;
190
191         ENTRY;
192
193         LASSERT(res != NULL);
194
195         ofd = ldlm_res_to_ns(res)->ns_lvbp;
196         LASSERT(ofd != NULL);
197
198         rc = lu_env_init(&env, LCT_DT_THREAD);
199         if (rc)
200                 RETURN(rc);
201
202         info = ofd_info_init(&env, NULL);
203         fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
204
205         lvb = res->lr_lvb_data;
206         if (lvb == NULL) {
207                 CERROR("%s: no LVB data for "DFID"\n",
208                        ofd_name(ofd), PFID(&info->fti_fid));
209                 GOTO(out_env, rc = 0);
210         }
211
212         /* Update the LVB from the network message */
213         if (req != NULL) {
214                 struct ost_lvb *rpc_lvb;
215                 bool lvb_type;
216
217                 if (req->rq_import != NULL)
218                         lvb_type = imp_connect_lvb_type(req->rq_import);
219                 else
220                         lvb_type = exp_connect_lvb_type(req->rq_export);
221
222                 if (!lvb_type) {
223                         struct ost_lvb_v1 *lvb_v1;
224
225                         lvb_v1 = req_capsule_server_swab_get(&req->rq_pill,
226                                         &RMF_DLM_LVB, lustre_swab_ost_lvb_v1);
227                         if (lvb_v1 == NULL)
228                                 goto disk_update;
229
230                         rpc_lvb = &info->fti_lvb;
231                         memcpy(rpc_lvb, lvb_v1, sizeof *lvb_v1);
232                         rpc_lvb->lvb_mtime_ns = 0;
233                         rpc_lvb->lvb_atime_ns = 0;
234                         rpc_lvb->lvb_ctime_ns = 0;
235                 } else {
236                         rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
237                                                               &RMF_DLM_LVB,
238                                                         lustre_swab_ost_lvb);
239                         if (rpc_lvb == NULL)
240                                 goto disk_update;
241                 }
242
243                 lock_res(res);
244                 if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
245                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
246                                LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
247                                lvb->lvb_size, rpc_lvb->lvb_size);
248                         lvb->lvb_size = rpc_lvb->lvb_size;
249                 }
250                 if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
251                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
252                                LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
253                                lvb->lvb_mtime, rpc_lvb->lvb_mtime);
254                         lvb->lvb_mtime = rpc_lvb->lvb_mtime;
255                 }
256                 if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
257                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
258                                LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
259                                lvb->lvb_atime, rpc_lvb->lvb_atime);
260                         lvb->lvb_atime = rpc_lvb->lvb_atime;
261                 }
262                 if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
263                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
264                                LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
265                                lvb->lvb_ctime, rpc_lvb->lvb_ctime);
266                         lvb->lvb_ctime = rpc_lvb->lvb_ctime;
267                 }
268                 if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
269                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
270                                LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
271                                lvb->lvb_blocks, rpc_lvb->lvb_blocks);
272                         lvb->lvb_blocks = rpc_lvb->lvb_blocks;
273                 }
274                 unlock_res(res);
275         }
276
277 disk_update:
278         /* Update the LVB from the disk inode */
279         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
280                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
281         fo = ofd_object_find(&env, ofd, &info->fti_fid);
282         if (IS_ERR(fo))
283                 GOTO(out_env, rc = PTR_ERR(fo));
284
285         rc = ofd_attr_get(&env, fo, &info->fti_attr);
286         if (rc)
287                 GOTO(out_obj, rc);
288
289         lock_res(res);
290         if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
291                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
292                        LPU64" -> %llu\n", PFID(&info->fti_fid),
293                        lvb->lvb_size, info->fti_attr.la_size);
294                 lvb->lvb_size = info->fti_attr.la_size;
295         }
296
297         if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
298                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
299                        LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
300                        lvb->lvb_mtime, info->fti_attr.la_mtime);
301                 lvb->lvb_mtime = info->fti_attr.la_mtime;
302         }
303         if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
304                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
305                        LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
306                        lvb->lvb_atime, info->fti_attr.la_atime);
307                 lvb->lvb_atime = info->fti_attr.la_atime;
308         }
309         if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
310                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
311                        LPU64" -> "LPU64"\n", PFID(&info->fti_fid),
312                        lvb->lvb_ctime, info->fti_attr.la_ctime);
313                 lvb->lvb_ctime = info->fti_attr.la_ctime;
314         }
315         if (info->fti_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
316                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
317                        LPU64" -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks,
318                        (unsigned long long)info->fti_attr.la_blocks);
319                 lvb->lvb_blocks = info->fti_attr.la_blocks;
320         }
321         unlock_res(res);
322
323 out_obj:
324         ofd_object_put(&env, fo);
325 out_env:
326         lu_env_fini(&env);
327         return rc;
328 }
329
330 /**
331  * Implementation of ldlm_valblock_ops::lvbo_size for OFD.
332  *
333  * This function returns size of LVB data so appropriate RPC size will be
334  * reserved. This is used for compatibility needs between server and client
335  * of different Lustre versions.
336  *
337  * \param[in] lock      LDLM lock
338  *
339  * \retval              size of LVB data
340  */
341 static int ofd_lvbo_size(struct ldlm_lock *lock)
342 {
343         if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export))
344                 return sizeof(struct ost_lvb);
345         else
346                 return sizeof(struct ost_lvb_v1);
347 }
348
349 /**
350  * Implementation of ldlm_valblock_ops::lvbo_fill for OFD.
351  *
352  * This function is called to fill the given RPC buffer \a buf with LVB data
353  *
354  * \param[in] lock      LDLM lock
355  * \param[in] buf       RPC buffer to fill
356  * \param[in] buflen    buffer length
357  *
358  * \retval              size of LVB data written into \a buf buffer
359  */
360 static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen)
361 {
362         struct ldlm_resource *res = lock->l_resource;
363         int lvb_len;
364
365         /* Former lvbo_init not allocate the "LVB". */
366         if (unlikely(res->lr_lvb_len == 0))
367                 return 0;
368
369         lvb_len = ofd_lvbo_size(lock);
370         LASSERT(lvb_len <= res->lr_lvb_len);
371
372         if (lvb_len > buflen)
373                 lvb_len = buflen;
374
375         lock_res(res);
376         memcpy(buf, res->lr_lvb_data, lvb_len);
377         unlock_res(res);
378
379         return lvb_len;
380 }
381
382 struct ldlm_valblock_ops ofd_lvbo = {
383         .lvbo_init      = ofd_lvbo_init,
384         .lvbo_update    = ofd_lvbo_update,
385         .lvbo_free      = ofd_lvbo_free,
386         .lvbo_size      = ofd_lvbo_size,
387         .lvbo_fill      = ofd_lvbo_fill
388 };