Whamcloud - gitweb
0397f0beeb98d976169fbd8818b31ba3d283e18b
[fs/lustre-release.git] / lustre / ofd / ofd_lvb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_lvb.c
33  *
34  * This file contains methods for OBD Filter Device (OFD)
35  * Lock Value Block (LVB) operations.
36  *
37  * LVB is special opaque (to LDLM) data that is associated with an LDLM lock
38  * and transferred from client to server and back. OFD LVBs are used to
39  * maintain current object size/times.
40  *
41  * Author: Andreas Dilger <andreas.dilger@intel.com>
42  * Author: Mikhail Pershin <mike.pershin@intel.com>
43  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_FILTER
47
48 #include <lustre_swab.h>
49 #include "ofd_internal.h"
50
51 /**
52  * Implementation of ldlm_valblock_ops::lvbo_free for OFD.
53  *
54  * This function frees allocated LVB data if it associated with the given
55  * LDLM resource.
56  *
57  * \param[in] res       LDLM resource
58  *
59  * \retval              0 on successful setup
60  * \retval              negative value on error
61  */
62 static int ofd_lvbo_free(struct ldlm_resource *res)
63 {
64         if (res->lr_lvb_data)
65                 OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
66
67         return 0;
68 }
69
70 /**
71  * Implementation of ldlm_valblock_ops::lvbo_init for OFD.
72  *
73  * This function allocates and initializes new LVB data for the given
74  * LDLM resource if it is not allocated yet. New LVB is filled with attributes
75  * of the object associated with that resource. Function does nothing if LVB
76  * for the given LDLM resource is allocated already.
77  *
78  * Called with res->lr_lvb_sem held.
79  *
80  * \param[in] lock      LDLM lock on resource
81  *
82  * \retval              0 on successful setup
83  * \retval              negative value on error
84  */
85 static int ofd_lvbo_init(struct ldlm_resource *res)
86 {
87         struct ost_lvb          *lvb;
88         struct ofd_device       *ofd;
89         struct ofd_object       *fo;
90         struct ofd_thread_info  *info;
91         struct lu_env *env;
92         int rc = 0;
93         ENTRY;
94
95         LASSERT(res);
96         LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
97
98         if (res->lr_lvb_data != NULL)
99                 RETURN(0);
100
101         ofd = ldlm_res_to_ns(res)->ns_lvbp;
102         LASSERT(ofd != NULL);
103
104         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
105                 RETURN(-ENOMEM);
106
107         env = lu_env_find();
108         LASSERT(env);
109
110         OBD_ALLOC_PTR(lvb);
111         if (lvb == NULL)
112                 GOTO(out, rc = -ENOMEM);
113
114         info = ofd_info(env);
115         res->lr_lvb_data = lvb;
116         res->lr_lvb_len = sizeof(*lvb);
117
118         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
119                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
120         fo = ofd_object_find(env, ofd, &info->fti_fid);
121         if (IS_ERR(fo))
122                 GOTO(out_lvb, rc = PTR_ERR(fo));
123
124         rc = ofd_attr_get(env, fo, &info->fti_attr);
125         if (rc) {
126                 struct ofd_seq          *oseq;
127                 __u64                    seq;
128
129                 /* Object could be recreated during the first
130                  * CLEANUP_ORPHAN request. */
131                 if (rc == -ENOENT) {
132                         seq = fid_seq(&info->fti_fid);
133                         oseq = ofd_seq_load(env, ofd, fid_seq_is_idif(seq) ?
134                                             FID_SEQ_OST_MDT0 : seq);
135                         if (!IS_ERR_OR_NULL(oseq)) {
136                                 if (!oseq->os_last_id_synced)
137                                         rc = -EAGAIN;
138                                 ofd_seq_put(env, oseq);
139                         }
140                 }
141                 GOTO(out_obj, rc);
142         }
143
144         lvb->lvb_size = info->fti_attr.la_size;
145         lvb->lvb_blocks = info->fti_attr.la_blocks;
146         lvb->lvb_mtime = info->fti_attr.la_mtime;
147         lvb->lvb_atime = info->fti_attr.la_atime;
148         lvb->lvb_ctime = info->fti_attr.la_ctime;
149
150         if (fo->ofo_atime_ondisk == 0)
151                 fo->ofo_atime_ondisk = info->fti_attr.la_atime;
152
153         CDEBUG(D_DLMTRACE,
154                "res: "DFID" initial LVB size: %llu, mtime: %#llx, atime: %#llx, ctime: %#llx, blocks: %#llx\n",
155                PFID(&info->fti_fid), lvb->lvb_size, lvb->lvb_mtime,
156                lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks);
157
158         info->fti_attr.la_valid = 0;
159
160         EXIT;
161 out_obj:
162         ofd_object_put(env, fo);
163 out_lvb:
164         if (rc != 0)
165                 OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
166 out:
167         /* Don't free lvb data on lookup error */
168         return rc;
169 }
170
171 /**
172  * Implementation of ldlm_valblock_ops::lvbo_update for OFD.
173  *
174  * When a client generates a glimpse enqueue, it wants to get the current
175  * file size and updated attributes for a stat() type operation, but these
176  * attributes may be writeback cached on another client. The client with
177  * the DLM extent lock at the highest offset is asked for its current
178  * attributes via a glimpse callback on its extent lock, on the assumption
179  * that it has the highest file size and the newest timestamps. The timestamps
180  * are guaranteed to be correct if there is only a single writer on the file,
181  * but may be slightly inaccurate if there are multiple concurrent writers on
182  * the same object. In order to avoid race conditions between the glimpse AST
183  * and the client cancelling the lock, ofd_lvbo_update() also updates
184  * the attributes from the local object. If the last client hasn't done any
185  * writes yet, or has already written its data and cancelled its lock before
186  * it processed the glimpse, then the local inode will have more uptodate
187  * information.
188  *
189  * This is called in two ways:
190  *  \a req != NULL : called by the DLM itself after a glimpse callback
191  *  \a req == NULL : called by the OFD after a disk write
192  *
193  * \param[in] lock              LDLM lock
194  * \param[in] req               PTLRPC request
195  * \param[in] increase_only     don't allow LVB values to decrease
196  *
197  * \retval              0 on successful setup
198  * \retval              negative value on error
199  */
200 static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
201                            struct ptlrpc_request *req, int increase_only)
202 {
203         struct ofd_thread_info *info;
204         struct ofd_device *ofd;
205         struct ofd_object *fo;
206         struct ost_lvb  *lvb;
207         const struct lu_env *env;
208         int rc = 0;
209
210         ENTRY;
211
212         env = lu_env_find();
213         LASSERT(env);
214         info = ofd_info(env);
215         LASSERT(res != NULL);
216
217         ofd = ldlm_res_to_ns(res)->ns_lvbp;
218         LASSERT(ofd != NULL);
219
220         fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
221
222         lvb = res->lr_lvb_data;
223         if (lvb == NULL) {
224                 CERROR("%s: no LVB data for "DFID"\n",
225                        ofd_name(ofd), PFID(&info->fti_fid));
226                 GOTO(out, rc = 0);
227         }
228
229         /* Update the LVB from the network message */
230         if (req != NULL) {
231                 struct ost_lvb *rpc_lvb;
232                 bool lvb_type;
233
234                 if (req->rq_import != NULL)
235                         lvb_type = imp_connect_lvb_type(req->rq_import);
236                 else
237                         lvb_type = exp_connect_lvb_type(req->rq_export);
238
239                 if (!lvb_type) {
240                         struct ost_lvb_v1 *lvb_v1;
241
242                         lvb_v1 = req_capsule_server_swab_get(&req->rq_pill,
243                                         &RMF_DLM_LVB, lustre_swab_ost_lvb_v1);
244                         if (lvb_v1 == NULL)
245                                 goto disk_update;
246
247                         rpc_lvb = &info->fti_lvb;
248                         memcpy(rpc_lvb, lvb_v1, sizeof *lvb_v1);
249                         rpc_lvb->lvb_mtime_ns = 0;
250                         rpc_lvb->lvb_atime_ns = 0;
251                         rpc_lvb->lvb_ctime_ns = 0;
252                 } else {
253                         rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
254                                                               &RMF_DLM_LVB,
255                                                         lustre_swab_ost_lvb);
256                         if (rpc_lvb == NULL)
257                                 goto disk_update;
258                 }
259
260                 lock_res(res);
261                 if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
262                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
263                                "%llu -> %llu\n", PFID(&info->fti_fid),
264                                lvb->lvb_size, rpc_lvb->lvb_size);
265                         lvb->lvb_size = rpc_lvb->lvb_size;
266                 }
267                 if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
268                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
269                                "%llu -> %llu\n", PFID(&info->fti_fid),
270                                lvb->lvb_mtime, rpc_lvb->lvb_mtime);
271                         lvb->lvb_mtime = rpc_lvb->lvb_mtime;
272                 }
273                 if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
274                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
275                                "%llu -> %llu\n", PFID(&info->fti_fid),
276                                lvb->lvb_atime, rpc_lvb->lvb_atime);
277                         lvb->lvb_atime = rpc_lvb->lvb_atime;
278                 }
279                 if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
280                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
281                                "%llu -> %llu\n", PFID(&info->fti_fid),
282                                lvb->lvb_ctime, rpc_lvb->lvb_ctime);
283                         lvb->lvb_ctime = rpc_lvb->lvb_ctime;
284                 }
285                 if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
286                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
287                                "%llu -> %llu\n", PFID(&info->fti_fid),
288                                lvb->lvb_blocks, rpc_lvb->lvb_blocks);
289                         lvb->lvb_blocks = rpc_lvb->lvb_blocks;
290                 }
291                 unlock_res(res);
292         }
293
294 disk_update:
295         /* Update the LVB from the disk inode */
296         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
297                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
298         fo = ofd_object_find(env, ofd, &info->fti_fid);
299         if (IS_ERR(fo))
300                 GOTO(out, rc = PTR_ERR(fo));
301
302         rc = ofd_attr_get(env, fo, &info->fti_attr);
303         if (rc)
304                 GOTO(out_obj, rc);
305
306         lock_res(res);
307         if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
308                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
309                        "%llu -> %llu\n", PFID(&info->fti_fid),
310                        lvb->lvb_size, info->fti_attr.la_size);
311                 lvb->lvb_size = info->fti_attr.la_size;
312         }
313
314         if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
315                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
316                        "%llu -> %llu\n", PFID(&info->fti_fid),
317                        lvb->lvb_mtime, info->fti_attr.la_mtime);
318                 lvb->lvb_mtime = info->fti_attr.la_mtime;
319         }
320         if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
321                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
322                        "%llu -> %llu\n", PFID(&info->fti_fid),
323                        lvb->lvb_atime, info->fti_attr.la_atime);
324                 lvb->lvb_atime = info->fti_attr.la_atime;
325         }
326         if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
327                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
328                        "%llu -> %llu\n", PFID(&info->fti_fid),
329                        lvb->lvb_ctime, info->fti_attr.la_ctime);
330                 lvb->lvb_ctime = info->fti_attr.la_ctime;
331         }
332         if (info->fti_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
333                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
334                        "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks,
335                        (unsigned long long)info->fti_attr.la_blocks);
336                 lvb->lvb_blocks = info->fti_attr.la_blocks;
337         }
338         unlock_res(res);
339
340         info->fti_attr.la_valid = 0;
341 out_obj:
342         ofd_object_put(env, fo);
343 out:
344         return rc;
345 }
346
347 /**
348  * Implementation of ldlm_valblock_ops::lvbo_size for OFD.
349  *
350  * This function returns size of LVB data so appropriate RPC size will be
351  * reserved. This is used for compatibility needs between server and client
352  * of different Lustre versions.
353  *
354  * \param[in] lock      LDLM lock
355  *
356  * \retval              size of LVB data
357  */
358 static int ofd_lvbo_size(struct ldlm_lock *lock)
359 {
360         if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export))
361                 return sizeof(struct ost_lvb);
362         else
363                 return sizeof(struct ost_lvb_v1);
364 }
365
366 /**
367  * Implementation of ldlm_valblock_ops::lvbo_fill for OFD.
368  *
369  * This function is called to fill the given RPC buffer \a buf with LVB data
370  *
371  * \param[in] env       execution environment
372  * \param[in] lock      LDLM lock
373  * \param[in] buf       RPC buffer to fill
374  * \param[in] buflen    buffer length
375  *
376  * \retval              size of LVB data written into \a buf buffer
377  */
378 static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int *buflen)
379 {
380         struct ldlm_resource *res = lock->l_resource;
381         int lvb_len;
382
383         /* Former lvbo_init not allocate the "LVB". */
384         if (unlikely(res->lr_lvb_len == 0))
385                 return 0;
386
387         lvb_len = ofd_lvbo_size(lock);
388         LASSERT(lvb_len <= res->lr_lvb_len);
389
390         if (lvb_len > *buflen)
391                 lvb_len = *buflen;
392
393         lock_res(res);
394         memcpy(buf, res->lr_lvb_data, lvb_len);
395         unlock_res(res);
396
397         return lvb_len;
398 }
399
400 struct ldlm_valblock_ops ofd_lvbo = {
401         .lvbo_init      = ofd_lvbo_init,
402         .lvbo_update    = ofd_lvbo_update,
403         .lvbo_free      = ofd_lvbo_free,
404         .lvbo_size      = ofd_lvbo_size,
405         .lvbo_fill      = ofd_lvbo_fill
406 };