Whamcloud - gitweb
920c22ae827cbe5f536e6b767ca50e3d8d60fd67
[fs/lustre-release.git] / lustre / ofd / ofd_lvb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ofd/ofd_lvb.c
32  *
33  * This file contains methods for OBD Filter Device (OFD)
34  * Lock Value Block (LVB) operations.
35  *
36  * LVB is special opaque (to LDLM) data that is associated with an LDLM lock
37  * and transferred from client to server and back. OFD LVBs are used to
38  * maintain current object size/times.
39  *
40  * Author: Andreas Dilger <andreas.dilger@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_FILTER
46
47 #include <lustre_swab.h>
48 #include "ofd_internal.h"
49
50 /**
51  * Implementation of ldlm_valblock_ops::lvbo_free for OFD.
52  *
53  * This function frees allocated LVB data if it associated with the given
54  * LDLM resource.
55  *
56  * \param[in] res       LDLM resource
57  *
58  * \retval              0 on successful setup
59  * \retval              negative value on error
60  */
61 static int ofd_lvbo_free(struct ldlm_resource *res)
62 {
63         if (res->lr_lvb_data)
64                 OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
65
66         return 0;
67 }
68
69 /**
70  * Implementation of ldlm_valblock_ops::lvbo_init for OFD.
71  *
72  * This function allocates and initializes new LVB data for the given
73  * LDLM resource if it is not allocated yet. New LVB is filled with attributes
74  * of the object associated with that resource. Function does nothing if LVB
75  * for the given LDLM resource is allocated already.
76  *
77  * Called with res->lr_lvb_sem held.
78  *
79  * \param[in] lock      LDLM lock on resource
80  *
81  * \retval              0 on successful setup
82  * \retval              negative value on error
83  */
84 static int ofd_lvbo_init(struct ldlm_resource *res)
85 {
86         struct ost_lvb          *lvb;
87         struct ofd_device       *ofd;
88         struct ofd_object       *fo;
89         struct ofd_thread_info  *info;
90         struct lu_env *env;
91         int rc = 0;
92         ENTRY;
93
94         LASSERT(res);
95         LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
96
97         if (res->lr_lvb_data != NULL)
98                 RETURN(0);
99
100         ofd = ldlm_res_to_ns(res)->ns_lvbp;
101         LASSERT(ofd != NULL);
102
103         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
104                 RETURN(-ENOMEM);
105
106         env = lu_env_find();
107         LASSERT(env);
108
109         OBD_ALLOC_PTR(lvb);
110         if (lvb == NULL)
111                 GOTO(out, rc = -ENOMEM);
112
113         info = ofd_info(env);
114         res->lr_lvb_data = lvb;
115         res->lr_lvb_len = sizeof(*lvb);
116
117         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
118                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
119         fo = ofd_object_find(env, ofd, &info->fti_fid);
120         if (IS_ERR(fo))
121                 GOTO(out_lvb, rc = PTR_ERR(fo));
122
123         rc = ofd_attr_get(env, fo, &info->fti_attr);
124         if (rc) {
125                 struct ofd_seq          *oseq;
126                 __u64                    seq;
127
128                 /* Object could be recreated during the first
129                  * CLEANUP_ORPHAN request. */
130                 if (rc == -ENOENT) {
131                         seq = fid_seq(&info->fti_fid);
132                         oseq = ofd_seq_load(env, ofd, fid_seq_is_idif(seq) ?
133                                             FID_SEQ_OST_MDT0 : seq);
134                         if (!IS_ERR_OR_NULL(oseq)) {
135                                 if (!oseq->os_last_id_synced)
136                                         rc = -EAGAIN;
137                                 ofd_seq_put(env, oseq);
138                         }
139                 }
140                 GOTO(out_obj, rc);
141         }
142
143         lvb->lvb_size = info->fti_attr.la_size;
144         lvb->lvb_blocks = info->fti_attr.la_blocks;
145         lvb->lvb_mtime = info->fti_attr.la_mtime;
146         lvb->lvb_atime = info->fti_attr.la_atime;
147         lvb->lvb_ctime = info->fti_attr.la_ctime;
148
149         if (fo->ofo_atime_ondisk == 0)
150                 fo->ofo_atime_ondisk = info->fti_attr.la_atime;
151
152         CDEBUG(D_DLMTRACE,
153                "res: "DFID" initial LVB size: %llu, mtime: %#llx, atime: %#llx, ctime: %#llx, blocks: %#llx\n",
154                PFID(&info->fti_fid), lvb->lvb_size, lvb->lvb_mtime,
155                lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks);
156
157         info->fti_attr.la_valid = 0;
158
159         EXIT;
160 out_obj:
161         ofd_object_put(env, fo);
162 out_lvb:
163         if (rc != 0)
164                 OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
165 out:
166         /* Don't free lvb data on lookup error */
167         return rc;
168 }
169
170 /**
171  * Implementation of ldlm_valblock_ops::lvbo_update for OFD.
172  *
173  * When a client generates a glimpse enqueue, it wants to get the current
174  * file size and updated attributes for a stat() type operation, but these
175  * attributes may be writeback cached on another client. The client with
176  * the DLM extent lock at the highest offset is asked for its current
177  * attributes via a glimpse callback on its extent lock, on the assumption
178  * that it has the highest file size and the newest timestamps. The timestamps
179  * are guaranteed to be correct if there is only a single writer on the file,
180  * but may be slightly inaccurate if there are multiple concurrent writers on
181  * the same object. In order to avoid race conditions between the glimpse AST
182  * and the client cancelling the lock, ofd_lvbo_update() also updates
183  * the attributes from the local object. If the last client hasn't done any
184  * writes yet, or has already written its data and cancelled its lock before
185  * it processed the glimpse, then the local inode will have more uptodate
186  * information.
187  *
188  * This is called in two ways:
189  *  \a req != NULL : called by the DLM itself after a glimpse callback
190  *  \a req == NULL : called by the OFD after a disk write
191  *
192  * \param[in] lock              LDLM lock
193  * \param[in] req               PTLRPC request
194  * \param[in] increase_only     don't allow LVB values to decrease
195  *
196  * \retval              0 on successful setup
197  * \retval              negative value on error
198  */
199 static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
200                            struct ptlrpc_request *req, int increase_only)
201 {
202         struct ofd_thread_info *info;
203         struct ofd_device *ofd;
204         struct ofd_object *fo;
205         struct ost_lvb  *lvb;
206         const struct lu_env *env;
207         int rc = 0;
208
209         ENTRY;
210
211         env = lu_env_find();
212         LASSERT(env);
213         info = ofd_info(env);
214         LASSERT(res != NULL);
215
216         ofd = ldlm_res_to_ns(res)->ns_lvbp;
217         LASSERT(ofd != NULL);
218
219         fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
220
221         lvb = res->lr_lvb_data;
222         if (lvb == NULL) {
223                 CERROR("%s: no LVB data for "DFID"\n",
224                        ofd_name(ofd), PFID(&info->fti_fid));
225                 GOTO(out, rc = 0);
226         }
227
228         /* Update the LVB from the network message */
229         if (req != NULL) {
230                 struct ost_lvb *rpc_lvb;
231                 bool lvb_type;
232
233                 if (req->rq_import != NULL)
234                         lvb_type = imp_connect_lvb_type(req->rq_import);
235                 else
236                         lvb_type = exp_connect_lvb_type(req->rq_export);
237
238                 if (!lvb_type) {
239                         struct ost_lvb_v1 *lvb_v1;
240
241                         lvb_v1 = req_capsule_server_swab_get(&req->rq_pill,
242                                         &RMF_DLM_LVB, lustre_swab_ost_lvb_v1);
243                         if (lvb_v1 == NULL)
244                                 goto disk_update;
245
246                         rpc_lvb = &info->fti_lvb;
247                         memcpy(rpc_lvb, lvb_v1, sizeof *lvb_v1);
248                         rpc_lvb->lvb_mtime_ns = 0;
249                         rpc_lvb->lvb_atime_ns = 0;
250                         rpc_lvb->lvb_ctime_ns = 0;
251                 } else {
252                         rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
253                                                               &RMF_DLM_LVB,
254                                                         lustre_swab_ost_lvb);
255                         if (rpc_lvb == NULL)
256                                 goto disk_update;
257                 }
258
259                 lock_res(res);
260                 if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
261                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
262                                "%llu -> %llu\n", PFID(&info->fti_fid),
263                                lvb->lvb_size, rpc_lvb->lvb_size);
264                         lvb->lvb_size = rpc_lvb->lvb_size;
265                 }
266                 if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
267                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
268                                "%llu -> %llu\n", PFID(&info->fti_fid),
269                                lvb->lvb_mtime, rpc_lvb->lvb_mtime);
270                         lvb->lvb_mtime = rpc_lvb->lvb_mtime;
271                 }
272                 if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
273                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
274                                "%llu -> %llu\n", PFID(&info->fti_fid),
275                                lvb->lvb_atime, rpc_lvb->lvb_atime);
276                         lvb->lvb_atime = rpc_lvb->lvb_atime;
277                 }
278                 if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
279                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
280                                "%llu -> %llu\n", PFID(&info->fti_fid),
281                                lvb->lvb_ctime, rpc_lvb->lvb_ctime);
282                         lvb->lvb_ctime = rpc_lvb->lvb_ctime;
283                 }
284                 if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
285                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
286                                "%llu -> %llu\n", PFID(&info->fti_fid),
287                                lvb->lvb_blocks, rpc_lvb->lvb_blocks);
288                         lvb->lvb_blocks = rpc_lvb->lvb_blocks;
289                 }
290                 unlock_res(res);
291         }
292
293 disk_update:
294         /* Update the LVB from the disk inode */
295         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
296                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
297         fo = ofd_object_find(env, ofd, &info->fti_fid);
298         if (IS_ERR(fo))
299                 GOTO(out, rc = PTR_ERR(fo));
300
301         rc = ofd_attr_get(env, fo, &info->fti_attr);
302         if (rc)
303                 GOTO(out_obj, rc);
304
305         lock_res(res);
306         if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
307                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
308                        "%llu -> %llu\n", PFID(&info->fti_fid),
309                        lvb->lvb_size, info->fti_attr.la_size);
310                 lvb->lvb_size = info->fti_attr.la_size;
311         }
312
313         if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
314                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
315                        "%llu -> %llu\n", PFID(&info->fti_fid),
316                        lvb->lvb_mtime, info->fti_attr.la_mtime);
317                 lvb->lvb_mtime = info->fti_attr.la_mtime;
318         }
319         if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
320                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
321                        "%llu -> %llu\n", PFID(&info->fti_fid),
322                        lvb->lvb_atime, info->fti_attr.la_atime);
323                 lvb->lvb_atime = info->fti_attr.la_atime;
324         }
325         if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
326                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
327                        "%llu -> %llu\n", PFID(&info->fti_fid),
328                        lvb->lvb_ctime, info->fti_attr.la_ctime);
329                 lvb->lvb_ctime = info->fti_attr.la_ctime;
330         }
331         if (info->fti_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
332                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
333                        "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks,
334                        (unsigned long long)info->fti_attr.la_blocks);
335                 lvb->lvb_blocks = info->fti_attr.la_blocks;
336         }
337         unlock_res(res);
338
339         info->fti_attr.la_valid = 0;
340 out_obj:
341         ofd_object_put(env, fo);
342 out:
343         return rc;
344 }
345
346 /**
347  * Implementation of ldlm_valblock_ops::lvbo_size for OFD.
348  *
349  * This function returns size of LVB data so appropriate RPC size will be
350  * reserved. This is used for compatibility needs between server and client
351  * of different Lustre versions.
352  *
353  * \param[in] lock      LDLM lock
354  *
355  * \retval              size of LVB data
356  */
357 static int ofd_lvbo_size(struct ldlm_lock *lock)
358 {
359         if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export))
360                 return sizeof(struct ost_lvb);
361         else
362                 return sizeof(struct ost_lvb_v1);
363 }
364
365 /**
366  * Implementation of ldlm_valblock_ops::lvbo_fill for OFD.
367  *
368  * This function is called to fill the given RPC buffer \a buf with LVB data
369  *
370  * \param[in] env       execution environment
371  * \param[in] lock      LDLM lock
372  * \param[in] buf       RPC buffer to fill
373  * \param[in] buflen    buffer length
374  *
375  * \retval              size of LVB data written into \a buf buffer
376  */
377 static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int *buflen)
378 {
379         struct ldlm_resource *res = lock->l_resource;
380         int lvb_len;
381
382         /* Former lvbo_init not allocate the "LVB". */
383         if (unlikely(res->lr_lvb_len == 0))
384                 return 0;
385
386         lvb_len = ofd_lvbo_size(lock);
387         LASSERT(lvb_len <= res->lr_lvb_len);
388
389         if (lvb_len > *buflen)
390                 lvb_len = *buflen;
391
392         lock_res(res);
393         memcpy(buf, res->lr_lvb_data, lvb_len);
394         unlock_res(res);
395
396         return lvb_len;
397 }
398
399 struct ldlm_valblock_ops ofd_lvbo = {
400         .lvbo_init      = ofd_lvbo_init,
401         .lvbo_update    = ofd_lvbo_update,
402         .lvbo_free      = ofd_lvbo_free,
403         .lvbo_size      = ofd_lvbo_size,
404         .lvbo_fill      = ofd_lvbo_fill
405 };