4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/ofd/ofd_lvb.c
33 * This file contains methods for OBD Filter Device (OFD)
34 * Lock Value Block (LVB) operations.
36 * LVB is special opaque (to LDLM) data that is associated with an LDLM lock
37 * and transferred from client to server and back. OFD LVBs are used to
38 * maintain current object size/times.
40 * Author: Andreas Dilger <andreas.dilger@intel.com>
41 * Author: Mikhail Pershin <mike.pershin@intel.com>
42 * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
45 #define DEBUG_SUBSYSTEM S_FILTER
47 #include <lustre_swab.h>
48 #include "ofd_internal.h"
51 * Implementation of ldlm_valblock_ops::lvbo_free for OFD.
53 * This function frees allocated LVB data if it associated with the given
56 * \param[in] res LDLM resource
58 * \retval 0 on successful setup
59 * \retval negative value on error
61 static int ofd_lvbo_free(struct ldlm_resource *res)
64 OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
69 static bool ofd_resync_allowed(struct ofd_device *ofd)
71 struct obd_device *obd = ofd_obd(ofd);
73 if (obd->obd_recovery_start == 0)
76 if (obd->obd_recovery_start + obd->obd_recovery_time_hard <
84 * Implementation of ldlm_valblock_ops::lvbo_init for OFD.
86 * This function allocates and initializes new LVB data for the given
87 * LDLM resource if it is not allocated yet. New LVB is filled with attributes
88 * of the object associated with that resource. Function does nothing if LVB
89 * for the given LDLM resource is allocated already.
91 * Called with res->lr_lvb_sem held.
93 * \param[in] lock LDLM lock on resource
95 * \retval 0 on successful setup
96 * \retval negative value on error
98 static int ofd_lvbo_init(struct ldlm_resource *res)
101 struct ofd_device *ofd;
102 struct ofd_object *fo;
103 struct ofd_thread_info *info;
109 LASSERT(mutex_is_locked(&res->lr_lvb_mutex));
111 if (res->lr_lvb_data != NULL)
114 ofd = ldlm_res_to_ns(res)->ns_lvbp;
115 LASSERT(ofd != NULL);
117 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB))
125 GOTO(out, rc = -ENOMEM);
127 info = ofd_info(env);
128 res->lr_lvb_data = lvb;
129 res->lr_lvb_len = sizeof(*lvb);
131 ost_fid_from_resid(&info->fti_fid, &res->lr_name,
132 ofd->ofd_lut.lut_lsd.lsd_osd_index);
133 fo = ofd_object_find(env, ofd, &info->fti_fid);
135 GOTO(out_lvb, rc = PTR_ERR(fo));
137 rc = ofd_attr_get(env, fo, &info->fti_attr);
139 struct ofd_seq *oseq;
142 /* Object could be recreated during the first
143 * CLEANUP_ORPHAN request. */
145 seq = fid_seq(&info->fti_fid);
146 oseq = ofd_seq_load(env, ofd, fid_seq_is_idif(seq) ?
147 FID_SEQ_OST_MDT0 : seq);
148 if (!IS_ERR_OR_NULL(oseq)) {
149 if (!oseq->os_last_id_synced &&
150 ofd_resync_allowed(ofd))
152 ofd_seq_put(env, oseq);
158 lvb->lvb_size = info->fti_attr.la_size;
159 lvb->lvb_blocks = info->fti_attr.la_blocks;
160 lvb->lvb_mtime = info->fti_attr.la_mtime;
161 lvb->lvb_atime = info->fti_attr.la_atime;
162 lvb->lvb_ctime = info->fti_attr.la_ctime;
164 if (fo->ofo_atime_ondisk == 0)
165 fo->ofo_atime_ondisk = info->fti_attr.la_atime;
168 "res: "DFID" initial LVB size: %llu, mtime: %#llx, atime: %#llx, ctime: %#llx, blocks: %#llx\n",
169 PFID(&info->fti_fid), lvb->lvb_size, lvb->lvb_mtime,
170 lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks);
172 info->fti_attr.la_valid = 0;
176 ofd_object_put(env, fo);
179 OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
181 /* Don't free lvb data on lookup error */
186 * Implementation of ldlm_valblock_ops::lvbo_update for OFD.
188 * When a client generates a glimpse enqueue, it wants to get the current
189 * file size and updated attributes for a stat() type operation, but these
190 * attributes may be writeback cached on another client. The client with
191 * the DLM extent lock at the highest offset is asked for its current
192 * attributes via a glimpse callback on its extent lock, on the assumption
193 * that it has the highest file size and the newest timestamps. The timestamps
194 * are guaranteed to be correct if there is only a single writer on the file,
195 * but may be slightly inaccurate if there are multiple concurrent writers on
196 * the same object. In order to avoid race conditions between the glimpse AST
197 * and the client cancelling the lock, ofd_lvbo_update() also updates
198 * the attributes from the local object. If the last client hasn't done any
199 * writes yet, or has already written its data and cancelled its lock before
200 * it processed the glimpse, then the local inode will have more uptodate
203 * This is called in two ways:
204 * \a req != NULL : called by the DLM itself after a glimpse callback
205 * \a req == NULL : called by the OFD after a disk write
207 * \param[in] lock LDLM lock
208 * \param[in] req PTLRPC request
209 * \param[in] increase_only don't allow LVB values to decrease
211 * \retval 0 on successful setup
212 * \retval negative value on error
214 static int ofd_lvbo_update(struct ldlm_resource *res, struct ldlm_lock *lock,
215 struct ptlrpc_request *req, int increase_only)
217 struct ofd_thread_info *info;
218 struct ofd_device *ofd;
219 struct ofd_object *fo;
221 const struct lu_env *env;
228 info = ofd_info(env);
229 LASSERT(res != NULL);
231 ofd = ldlm_res_to_ns(res)->ns_lvbp;
232 LASSERT(ofd != NULL);
234 fid_extract_from_res_name(&info->fti_fid, &res->lr_name);
236 lvb = res->lr_lvb_data;
238 CERROR("%s: no LVB data for "DFID"\n",
239 ofd_name(ofd), PFID(&info->fti_fid));
243 /* Update the LVB from the network message */
245 struct ost_lvb *rpc_lvb;
248 if (req->rq_import != NULL)
249 lvb_type = imp_connect_lvb_type(req->rq_import);
251 lvb_type = exp_connect_lvb_type(req->rq_export);
254 struct ost_lvb_v1 *lvb_v1;
256 lvb_v1 = req_capsule_server_swab_get(&req->rq_pill,
257 &RMF_DLM_LVB, lustre_swab_ost_lvb_v1);
261 rpc_lvb = &info->fti_lvb;
262 memcpy(rpc_lvb, lvb_v1, sizeof *lvb_v1);
263 rpc_lvb->lvb_mtime_ns = 0;
264 rpc_lvb->lvb_atime_ns = 0;
265 rpc_lvb->lvb_ctime_ns = 0;
267 rpc_lvb = req_capsule_server_swab_get(&req->rq_pill,
269 lustre_swab_ost_lvb);
275 if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
276 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: "
277 "%llu -> %llu\n", PFID(&info->fti_fid),
278 lvb->lvb_size, rpc_lvb->lvb_size);
279 lvb->lvb_size = rpc_lvb->lvb_size;
281 if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
282 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: "
283 "%llu -> %llu\n", PFID(&info->fti_fid),
284 lvb->lvb_mtime, rpc_lvb->lvb_mtime);
285 lvb->lvb_mtime = rpc_lvb->lvb_mtime;
287 if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
288 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: "
289 "%llu -> %llu\n", PFID(&info->fti_fid),
290 lvb->lvb_atime, rpc_lvb->lvb_atime);
291 lvb->lvb_atime = rpc_lvb->lvb_atime;
293 if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
294 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: "
295 "%llu -> %llu\n", PFID(&info->fti_fid),
296 lvb->lvb_ctime, rpc_lvb->lvb_ctime);
297 lvb->lvb_ctime = rpc_lvb->lvb_ctime;
299 if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) {
300 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: "
301 "%llu -> %llu\n", PFID(&info->fti_fid),
302 lvb->lvb_blocks, rpc_lvb->lvb_blocks);
303 lvb->lvb_blocks = rpc_lvb->lvb_blocks;
309 /* Update the LVB from the disk inode */
310 ost_fid_from_resid(&info->fti_fid, &res->lr_name,
311 ofd->ofd_lut.lut_lsd.lsd_osd_index);
312 fo = ofd_object_find(env, ofd, &info->fti_fid);
314 GOTO(out, rc = PTR_ERR(fo));
316 rc = ofd_attr_get(env, fo, &info->fti_attr);
321 if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
322 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: "
323 "%llu -> %llu\n", PFID(&info->fti_fid),
324 lvb->lvb_size, info->fti_attr.la_size);
325 lvb->lvb_size = info->fti_attr.la_size;
328 if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
329 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: "
330 "%llu -> %llu\n", PFID(&info->fti_fid),
331 lvb->lvb_mtime, info->fti_attr.la_mtime);
332 lvb->lvb_mtime = info->fti_attr.la_mtime;
334 if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
335 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: "
336 "%llu -> %llu\n", PFID(&info->fti_fid),
337 lvb->lvb_atime, info->fti_attr.la_atime);
338 lvb->lvb_atime = info->fti_attr.la_atime;
340 if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
341 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: "
342 "%llu -> %llu\n", PFID(&info->fti_fid),
343 lvb->lvb_ctime, info->fti_attr.la_ctime);
344 lvb->lvb_ctime = info->fti_attr.la_ctime;
346 if (info->fti_attr.la_blocks > lvb->lvb_blocks || !increase_only) {
347 CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: "
348 "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks,
349 (unsigned long long)info->fti_attr.la_blocks);
350 lvb->lvb_blocks = info->fti_attr.la_blocks;
354 info->fti_attr.la_valid = 0;
356 ofd_object_put(env, fo);
362 * Implementation of ldlm_valblock_ops::lvbo_size for OFD.
364 * This function returns size of LVB data so appropriate RPC size will be
365 * reserved. This is used for compatibility needs between server and client
366 * of different Lustre versions.
368 * \param[in] lock LDLM lock
370 * \retval size of LVB data
372 static int ofd_lvbo_size(struct ldlm_lock *lock)
374 if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export))
375 return sizeof(struct ost_lvb);
377 return sizeof(struct ost_lvb_v1);
381 * Implementation of ldlm_valblock_ops::lvbo_fill for OFD.
383 * This function is called to fill the given RPC buffer \a buf with LVB data
385 * \param[in] env execution environment
386 * \param[in] lock LDLM lock
387 * \param[in] buf RPC buffer to fill
388 * \param[in] buflen buffer length
390 * \retval size of LVB data written into \a buf buffer
392 static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int *buflen)
394 struct ldlm_resource *res = lock->l_resource;
397 /* Former lvbo_init not allocate the "LVB". */
398 if (unlikely(res->lr_lvb_len == 0))
401 lvb_len = ofd_lvbo_size(lock);
402 LASSERT(lvb_len <= res->lr_lvb_len);
404 if (lvb_len > *buflen)
408 memcpy(buf, res->lr_lvb_data, lvb_len);
414 struct ldlm_valblock_ops ofd_lvbo = {
415 .lvbo_init = ofd_lvbo_init,
416 .lvbo_update = ofd_lvbo_update,
417 .lvbo_free = ofd_lvbo_free,
418 .lvbo_size = ofd_lvbo_size,
419 .lvbo_fill = ofd_lvbo_fill