Whamcloud - gitweb
LU-15195 ofd: missing OST object
[fs/lustre-release.git] / lustre / ofd / ofd_lvb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ofd/ofd_lvb.c
32  *
33  * This file contains methods for OBD Filter Device (OFD)
34  * Lock Value Block (LVB) operations.
35  *
36  * LVB is special opaque (to LDLM) data that is associated with an LDLM lock
37  * and transferred from client to server and back. OFD LVBs are used to
38  * maintain current object size/times.
39  *
40  * Author: Andreas Dilger <andreas.dilger@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_FILTER
46
47 #include <lustre_swab.h>
48 #include "ofd_internal.h"
49
50 /**
51  * Implementation of ldlm_valblock_ops::lvbo_free for OFD.
52  *
53  * This function frees allocated LVB data if it associated with the given
54  * LDLM resource.
55  *
56  * \param[in] res       LDLM resource
57  *
58  * \retval              0 on successful setup
59  * \retval              negative value on error
60  */
61 static int ofd_lvbo_free(struct ldlm_resource *res)
62 {
63         if (res->lr_lvb_data)
64                 OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
65
66         return 0;
67 }
68
69 static bool ofd_resync_allowed(struct ofd_device *ofd)
70 {
71         struct obd_device *obd = ofd_obd(ofd);
72
73         if (obd->obd_recovery_start == 0)
74                 return false;
75
76         if (obd->obd_recovery_start + obd->obd_recovery_time_hard <
77             ktime_get_seconds())
78                 return false;
79
80         return true;
81 }
82
83 /**
84  * Implementation of ldlm_valblock_ops::lvbo_init for OFD.
85  *
86  * This function allocates and initializes new LVB data for the given
87  * LDLM resource if it is not allocated yet. New LVB is filled with attributes
88  * of the object associated with that resource. Function does nothing if LVB
89  * for the given LDLM resource is allocated already.
90  *
91  * Called with res->lr_lvb_sem held.
92  *
93  * \param[in] lock      LDLM lock on resource
94  *
95  * \retval              0 on successful setup
96  * \retval              negative value on error
97  */
98 static int ofd_lvbo_init(struct ldlm_resource *res)
99 {
100         struct ost_lvb          *lvb;
101         struct ofd_device       *ofd;
102         struct ofd_object       *fo;
103         struct ofd_thread_info  *info;
104         struct lu_env *env;
105         int rc = 0;
106         ENTRY;
107
108         LASSERT(res);
109         LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
110
111         if (res->lr_lvb_data != NULL)
112                 RETURN(0);
113
114         ofd = ldlm_res_to_ns(res)->ns_lvbp;
115         LASSERT(ofd != NULL);
116
117         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
118                 RETURN(-ENOMEM);
119
120         env = lu_env_find();
121         LASSERT(env);
122
123         OBD_ALLOC_PTR(lvb);
124         if (lvb == NULL)
125                 GOTO(out, rc = -ENOMEM);
126
127         info = ofd_info(env);
128         res->lr_lvb_data = lvb;
129         res->lr_lvb_len = sizeof(*lvb);
130
131         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
132                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
133         fo = ofd_object_find(env, ofd, &info->fti_fid);
134         if (IS_ERR(fo))
135                 GOTO(out_lvb, rc = PTR_ERR(fo));
136
137         rc = ofd_attr_get(env, fo, &info->fti_attr);
138         if (rc) {
139                 struct ofd_seq          *oseq;
140                 __u64                    seq;
141
142                 /* Object could be recreated during the first
143                  * CLEANUP_ORPHAN request. */
144                 if (rc == -ENOENT) {
145                         seq = fid_seq(&info->fti_fid);
146                         oseq = ofd_seq_load(env, ofd, fid_seq_is_idif(seq) ?
147                                             FID_SEQ_OST_MDT0 : seq);
148                         if (!IS_ERR_OR_NULL(oseq)) {
149                                 if (!oseq->os_last_id_synced &&
150                                     ofd_resync_allowed(ofd))
151                                         rc = -EINPROGRESS;
152                                 ofd_seq_put(env, oseq);
153                         }
154                 }
155                 GOTO(out_obj, rc);
156         }
157
158         lvb->lvb_size = info->fti_attr.la_size;
159         lvb->lvb_blocks = info->fti_attr.la_blocks;
160         lvb->lvb_mtime = info->fti_attr.la_mtime;
161         lvb->lvb_atime = info->fti_attr.la_atime;
162         lvb->lvb_ctime = info->fti_attr.la_ctime;
163
164         if (fo->ofo_atime_ondisk == 0)
165                 fo->ofo_atime_ondisk = info->fti_attr.la_atime;
166
167         CDEBUG(D_DLMTRACE,
168                "res: "DFID" initial LVB size: %llu, mtime: %#llx, atime: %#llx, ctime: %#llx, blocks: %#llx\n",
169                PFID(&info->fti_fid), lvb->lvb_size, lvb->lvb_mtime,
170                lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks);
171
172         info->fti_attr.la_valid = 0;
173
174         EXIT;
175 out_obj:
176         ofd_object_put(env, fo);
177 out_lvb:
178         if (rc != 0)
179                 OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
180 out:
181         /* Don't free lvb data on lookup error */
182         return rc;
183 }
184
185 /**
186  * Implementation of ldlm_valblock_ops::lvbo_update for OFD.
187  *
188  * When a client generates a glimpse enqueue, it wants to get the current
189  * file size and updated attributes for a stat() type operation, but these
190  * attributes may be writeback cached on another client. The client with
191  * the DLM extent lock at the highest offset is asked for its current
192  * attributes via a glimpse callback on its extent lock, on the assumption
193  * that it has the highest file size and the newest timestamps. The timestamps
194  * are guaranteed to be correct if there is only a single writer on the file,
195  * but may be slightly inaccurate if there are multiple concurrent writers on
196  * the same object. In order to avoid race conditions between the glimpse AST
197  * and the client cancelling the lock, ofd_lvbo_update() also updates
198  * the attributes from the local object. If the last client hasn't done any
199  * writes yet, or has already written its data and cancelled its lock before
200  * it processed the glimpse, then the local inode will have more uptodate
201  * information.
202  *
203  * This is called in two ways:
204  *  \a req != NULL : called by the DLM itself after a glimpse callback
205  *  \a req == NULL : called by the OFD after a disk write
206  *
207  * \param[in] lock              LDLM lock
208  * \param[in] req               PTLRPC request
209  * \param[in] increase_only     don't allow LVB values to decrease
210  *
211  * \retval              0 on successful setup
212  * \retval              negative value on error
213  */
214 static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
215                            struct ptlrpc_request *req, int increase_only)
216 {
217         struct ofd_thread_info *info;
218         struct ofd_device *ofd;
219         struct ofd_object *fo;
220         struct ost_lvb  *lvb;
221         const struct lu_env *env;
222         int rc = 0;
223
224         ENTRY;
225
226         env = lu_env_find();
227         LASSERT(env);
228         info = ofd_info(env);
229         LASSERT(res != NULL);
230
231         ofd = ldlm_res_to_ns(res)->ns_lvbp;
232         LASSERT(ofd != NULL);
233
234         fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
235
236         lvb = res->lr_lvb_data;
237         if (lvb == NULL) {
238                 CERROR("%s: no LVB data for "DFID"\n",
239                        ofd_name(ofd), PFID(&info->fti_fid));
240                 GOTO(out, rc = 0);
241         }
242
243         /* Update the LVB from the network message */
244         if (req != NULL) {
245                 struct ost_lvb *rpc_lvb;
246                 bool lvb_type;
247
248                 if (req->rq_import != NULL)
249                         lvb_type = imp_connect_lvb_type(req->rq_import);
250                 else
251                         lvb_type = exp_connect_lvb_type(req->rq_export);
252
253                 if (!lvb_type) {
254                         struct ost_lvb_v1 *lvb_v1;
255
256                         lvb_v1 = req_capsule_server_swab_get(&req->rq_pill,
257                                         &RMF_DLM_LVB, lustre_swab_ost_lvb_v1);
258                         if (lvb_v1 == NULL)
259                                 goto disk_update;
260
261                         rpc_lvb = &info->fti_lvb;
262                         memcpy(rpc_lvb, lvb_v1, sizeof *lvb_v1);
263                         rpc_lvb->lvb_mtime_ns = 0;
264                         rpc_lvb->lvb_atime_ns = 0;
265                         rpc_lvb->lvb_ctime_ns = 0;
266                 } else {
267                         rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
268                                                               &RMF_DLM_LVB,
269                                                         lustre_swab_ost_lvb);
270                         if (rpc_lvb == NULL)
271                                 goto disk_update;
272                 }
273
274                 lock_res(res);
275                 if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
276                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
277                                "%llu -> %llu\n", PFID(&info->fti_fid),
278                                lvb->lvb_size, rpc_lvb->lvb_size);
279                         lvb->lvb_size = rpc_lvb->lvb_size;
280                 }
281                 if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
282                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
283                                "%llu -> %llu\n", PFID(&info->fti_fid),
284                                lvb->lvb_mtime, rpc_lvb->lvb_mtime);
285                         lvb->lvb_mtime = rpc_lvb->lvb_mtime;
286                 }
287                 if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
288                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
289                                "%llu -> %llu\n", PFID(&info->fti_fid),
290                                lvb->lvb_atime, rpc_lvb->lvb_atime);
291                         lvb->lvb_atime = rpc_lvb->lvb_atime;
292                 }
293                 if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
294                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
295                                "%llu -> %llu\n", PFID(&info->fti_fid),
296                                lvb->lvb_ctime, rpc_lvb->lvb_ctime);
297                         lvb->lvb_ctime = rpc_lvb->lvb_ctime;
298                 }
299                 if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
300                         CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
301                                "%llu -> %llu\n", PFID(&info->fti_fid),
302                                lvb->lvb_blocks, rpc_lvb->lvb_blocks);
303                         lvb->lvb_blocks = rpc_lvb->lvb_blocks;
304                 }
305                 unlock_res(res);
306         }
307
308 disk_update:
309         /* Update the LVB from the disk inode */
310         ost_fid_from_resid(&info->fti_fid, &res->lr_name,
311                            ofd->ofd_lut.lut_lsd.lsd_osd_index);
312         fo = ofd_object_find(env, ofd, &info->fti_fid);
313         if (IS_ERR(fo))
314                 GOTO(out, rc = PTR_ERR(fo));
315
316         rc = ofd_attr_get(env, fo, &info->fti_attr);
317         if (rc)
318                 GOTO(out_obj, rc);
319
320         lock_res(res);
321         if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
322                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
323                        "%llu -> %llu\n", PFID(&info->fti_fid),
324                        lvb->lvb_size, info->fti_attr.la_size);
325                 lvb->lvb_size = info->fti_attr.la_size;
326         }
327
328         if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
329                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
330                        "%llu -> %llu\n", PFID(&info->fti_fid),
331                        lvb->lvb_mtime, info->fti_attr.la_mtime);
332                 lvb->lvb_mtime = info->fti_attr.la_mtime;
333         }
334         if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
335                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
336                        "%llu -> %llu\n", PFID(&info->fti_fid),
337                        lvb->lvb_atime, info->fti_attr.la_atime);
338                 lvb->lvb_atime = info->fti_attr.la_atime;
339         }
340         if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
341                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
342                        "%llu -> %llu\n", PFID(&info->fti_fid),
343                        lvb->lvb_ctime, info->fti_attr.la_ctime);
344                 lvb->lvb_ctime = info->fti_attr.la_ctime;
345         }
346         if (info->fti_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
347                 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
348                        "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks,
349                        (unsigned long long)info->fti_attr.la_blocks);
350                 lvb->lvb_blocks = info->fti_attr.la_blocks;
351         }
352         unlock_res(res);
353
354         info->fti_attr.la_valid = 0;
355 out_obj:
356         ofd_object_put(env, fo);
357 out:
358         return rc;
359 }
360
361 /**
362  * Implementation of ldlm_valblock_ops::lvbo_size for OFD.
363  *
364  * This function returns size of LVB data so appropriate RPC size will be
365  * reserved. This is used for compatibility needs between server and client
366  * of different Lustre versions.
367  *
368  * \param[in] lock      LDLM lock
369  *
370  * \retval              size of LVB data
371  */
372 static int ofd_lvbo_size(struct ldlm_lock *lock)
373 {
374         if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export))
375                 return sizeof(struct ost_lvb);
376         else
377                 return sizeof(struct ost_lvb_v1);
378 }
379
380 /**
381  * Implementation of ldlm_valblock_ops::lvbo_fill for OFD.
382  *
383  * This function is called to fill the given RPC buffer \a buf with LVB data
384  *
385  * \param[in] env       execution environment
386  * \param[in] lock      LDLM lock
387  * \param[in] buf       RPC buffer to fill
388  * \param[in] buflen    buffer length
389  *
390  * \retval              size of LVB data written into \a buf buffer
391  */
392 static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int *buflen)
393 {
394         struct ldlm_resource *res = lock->l_resource;
395         int lvb_len;
396
397         /* Former lvbo_init not allocate the "LVB". */
398         if (unlikely(res->lr_lvb_len == 0))
399                 return 0;
400
401         lvb_len = ofd_lvbo_size(lock);
402         LASSERT(lvb_len <= res->lr_lvb_len);
403
404         if (lvb_len > *buflen)
405                 lvb_len = *buflen;
406
407         lock_res(res);
408         memcpy(buf, res->lr_lvb_data, lvb_len);
409         unlock_res(res);
410
411         return lvb_len;
412 }
413
414 struct ldlm_valblock_ops ofd_lvbo = {
415         .lvbo_init      = ofd_lvbo_init,
416         .lvbo_update    = ofd_lvbo_update,
417         .lvbo_free      = ofd_lvbo_free,
418         .lvbo_size      = ofd_lvbo_size,
419         .lvbo_fill      = ofd_lvbo_fill
420 };