Whamcloud - gitweb
LU-14642 flr: allow layout version update from client/MDS
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ofd/ofd_objects.c
32  *
33  * This file contains OSD API methods related to OBD Filter Device (OFD)
34  * object operations.
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_FILTER
41
42 #include <dt_object.h>
43 #include <lustre_lfsck.h>
44
45 #include "ofd_internal.h"
46
47 /**
48  * Get object version from disk and check it.
49  *
50  * This function checks object version from disk with
51  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
52  * VBR (Version-Based Recovery) and ensures that object has the same version
53  * upon replay as it has during original modification.
54  *
55  * \param[in]  info     execution thread OFD private data
56  * \param[in]  fo       OFD object
57  *
58  * \retval              0 if version matches
59  * \retval              -EOVERFLOW on version mismatch
60  */
61 static int ofd_version_get_check(struct ofd_thread_info *info,
62                                  struct ofd_object *fo)
63 {
64         dt_obj_version_t curr_version;
65
66         if (info->fti_exp == NULL)
67                 RETURN(0);
68
69         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
70         if ((__s64)curr_version == -EOPNOTSUPP)
71                 RETURN(0);
72         /* VBR: version is checked always because costs nothing */
73         if (info->fti_pre_version != 0 &&
74             info->fti_pre_version != curr_version) {
75                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
76                        info->fti_pre_version, curr_version);
77                 spin_lock(&info->fti_exp->exp_lock);
78                 info->fti_exp->exp_vbr_failed = 1;
79                 spin_unlock(&info->fti_exp->exp_lock);
80                 RETURN (-EOVERFLOW);
81         }
82         info->fti_pre_version = curr_version;
83         RETURN(0);
84 }
85
86 /**
87  * Get OFD object by FID.
88  *
89  * This function finds OFD slice of compound object with the given FID.
90  *
91  * \param[in] env       execution environment
92  * \param[in] ofd       OFD device
93  * \param[in] fid       FID of the object
94  *
95  * \retval              pointer to the found ofd_object
96  * \retval              ERR_PTR(errno) in case of error
97  */
98 struct ofd_object *ofd_object_find(const struct lu_env *env,
99                                    struct ofd_device *ofd,
100                                    const struct lu_fid *fid)
101 {
102         struct ofd_object *fo;
103         struct lu_object  *o;
104
105         ENTRY;
106
107         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
108         if (likely(!IS_ERR(o)))
109                 fo = ofd_obj(o);
110         else
111                 fo = ERR_CAST(o); /* return error */
112
113         RETURN(fo);
114 }
115
116 /**
117  * Get FID of parent MDT object.
118  *
119  * This function reads extended attribute XATTR_NAME_FID of OFD object which
120  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
121  *
122  * The filter_fid::ff_parent::f_ver field currently holds
123  * the OST-object index in the parent MDT-object's layout EA,
124  * not the actual FID::f_ver of the parent. We therefore access
125  * it via the macro f_stripe_idx.
126  *
127  * \param[in] env       execution environment
128  * \param[in] fo        OFD object
129  *
130  * \retval              0 if successful
131  * \retval              -ENODATA if there is no such xattr
132  * \retval              negative value on error
133  */
134 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
135 {
136         struct ofd_thread_info *info = ofd_info(env);
137         struct filter_fid *ff = &fo->ofo_ff;
138         struct lu_buf *buf = &info->fti_buf;
139         int rc = 0;
140
141         if (fid_is_sane(&ff->ff_parent))
142                 return 0;
143
144         buf->lb_buf = ff;
145         buf->lb_len = sizeof(*ff);
146         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
147         if (rc < 0)
148                 return rc;
149
150         if (unlikely(rc < sizeof(struct lu_fid))) {
151                 fid_zero(&ff->ff_parent);
152                 return -EINVAL;
153         }
154
155         filter_fid_le_to_cpu(ff, ff, rc);
156
157         return 0;
158 }
159
160 struct ofd_precreate_cb {
161         struct dt_txn_commit_cb  opc_cb;
162         struct ofd_seq          *opc_oseq;
163         int                      opc_objects;
164 };
165
166 static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
167                              struct dt_txn_commit_cb *cb, int err)
168 {
169         struct ofd_precreate_cb *opc;
170         struct ofd_seq *oseq;
171
172         opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
173         oseq = opc->opc_oseq;
174
175         CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
176                opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
177                PFID(&oseq->os_oi.oi_fid), th->th_sync);
178         atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
179         ofd_seq_put(env, opc->opc_oseq);
180         OBD_FREE_PTR(opc);
181 }
182
183 static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
184                                 struct ofd_seq *oseq, int objects)
185 {
186         struct ofd_precreate_cb *opc;
187         struct dt_txn_commit_cb *dcb;
188         int precreate, rc;
189
190         OBD_ALLOC_PTR(opc);
191         if (!opc)
192                 return -ENOMEM;
193
194         precreate = atomic_read(&oseq->os_precreate_in_progress);
195         atomic_inc(&oseq->os_refc);
196         opc->opc_oseq = oseq;
197         opc->opc_objects = objects;
198         CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
199                opc->opc_objects, precreate,
200                PFID(&oseq->os_oi.oi_fid), th->th_sync);
201
202         if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
203                 th->th_sync = 1;
204
205         dcb = &opc->opc_cb;
206         dcb->dcb_func = ofd_cb_precreate;
207         INIT_LIST_HEAD(&dcb->dcb_linkage);
208         strlcpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
209
210         rc = dt_trans_cb_add(th, dcb);
211         if (rc) {
212                 ofd_seq_put(env, oseq);
213                 OBD_FREE_PTR(opc);
214                 return rc;
215         }
216
217         atomic_add(objects, &oseq->os_precreate_in_progress);
218
219         return 0;
220 }
221
222 /**
223  * Precreate the given number \a nr of objects in the given sequence \a oseq.
224  *
225  * This function precreates new OST objects in the given sequence.
226  * The precreation starts from \a id and creates \a nr objects sequentially.
227  *
228  * Notes:
229  * This function may create fewer objects than requested.
230  *
231  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
232  * first write. Currently the permission bits on the OST are never used,
233  * so this is OK.
234  *
235  * Initialize a/c/m time so any client timestamp will always be newer and
236  * update the inode. The ctime = 0 case is also handled specially in
237  * osd_inode_setattr(). See LU-221, LU-1042 for details.
238  *
239  * \param[in] env       execution environment
240  * \param[in] ofd       OFD device
241  * \param[in] id        object ID to start precreation from
242  * \param[in] oseq      object sequence
243  * \param[in] nr        number of objects to precreate
244  * \param[in] sync      synchronous precreation flag
245  *
246  * \retval              0 if successful
247  * \retval              negative value on error
248  */
249 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
250                           u64 id, struct ofd_seq *oseq, int nr, int sync)
251 {
252         struct ofd_thread_info  *info = ofd_info(env);
253         struct ofd_object       *fo = NULL;
254         struct dt_object        *next;
255         struct thandle          *th;
256         struct ofd_object       **batch;
257         struct lu_fid           *fid = &info->fti_fid;
258         u64                     tmp;
259         int                     rc;
260         int                     rc2;
261         int                     i;
262         int                     objects = 0;
263         int                     nr_saved = nr;
264
265         ENTRY;
266
267         /* Don't create objects beyond the valid range for this SEQ */
268         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
269                      (id + nr) > IDIF_MAX_OID)) {
270                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
271                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
272                 RETURN(rc = -ENOSPC);
273         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
274                             (id + nr) > OBIF_MAX_OID)) {
275                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
276                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
277                 RETURN(rc = -ENOSPC);
278         }
279
280         OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
281         if (batch == NULL)
282                 RETURN(-ENOMEM);
283
284         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
285         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
286         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
287
288         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
289         info->fti_attr.la_atime = 0;
290         info->fti_attr.la_mtime = 0;
291         info->fti_attr.la_ctime = 0;
292
293         LASSERT(id != 0);
294
295         /* prepare objects */
296         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
297         for (i = 0; i < nr; i++) {
298                 rc = fid_set_id(fid, id + i);
299                 if (rc != 0) {
300                         if (i == 0)
301                                 GOTO(out, rc);
302
303                         nr = i;
304                         break;
305                 }
306
307                 fo = ofd_object_find(env, ofd, fid);
308                 if (IS_ERR(fo)) {
309                         if (i == 0)
310                                 GOTO(out, rc = PTR_ERR(fo));
311
312                         nr = i;
313                         break;
314                 }
315
316                 batch[i] = fo;
317         }
318         info->fti_buf.lb_buf = &tmp;
319         info->fti_buf.lb_len = sizeof(tmp);
320         info->fti_off = 0;
321
322         th = ofd_trans_create(env, ofd);
323         if (IS_ERR(th))
324                 GOTO(out, rc = PTR_ERR(th));
325
326         th->th_sync |= sync;
327
328         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
329                                      info->fti_off, th);
330         if (rc)
331                 GOTO(trans_stop, rc);
332
333         for (i = 0; i < nr; i++) {
334                 fo = batch[i];
335                 LASSERT(fo);
336
337                 if (unlikely(ofd_object_exists(fo))) {
338                         /* object may exist being re-created by write replay */
339                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
340                                DFID"\n", ostid_seq(&oseq->os_oi), id,
341                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
342                         continue;
343                 }
344
345                 next = ofd_object_child(fo);
346                 LASSERT(next != NULL);
347
348                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
349                                        &info->fti_dof, th);
350                 if (rc < 0) {
351                         if (i == 0)
352                                 GOTO(trans_stop, rc);
353
354                         nr = i;
355                         break;
356                 }
357         }
358
359         rc = dt_trans_start(env, ofd->ofd_osd, th);
360         if (rc)
361                 GOTO(trans_stop, rc);
362
363         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
364                ofd_name(ofd), PFID(fid), nr);
365
366          /* When the LFSCK scanning the whole device to verify the LAST_ID file
367           * consistency, it will load the last_id into RAM firstly, and compare
368           * the last_id with each OST-object's ID. If the later one is larger,
369           * then it will regard the LAST_ID file crashed. But during the LFSCK
370           * scanning, the OFD may continue to create new OST-objects. Those new
371           * created OST-objects will have larger IDs than the LFSCK known ones.
372           * So from the LFSCK view, it needs to re-load the last_id from disk
373           * file, and if the latest last_id is still smaller than the object's
374           * ID, then the LAST_ID file is real crashed.
375           *
376           * To make above mechanism to work, before OFD pre-create OST-objects,
377           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
378           * may cannot get latest last_id although new OST-object created. */
379         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
380                 tmp = cpu_to_le64(id + nr - 1);
381                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
382                 rc = dt_record_write(env, oseq->os_lastid_obj,
383                                      &info->fti_buf, &info->fti_off, th);
384                 dt_write_unlock(env, oseq->os_lastid_obj);
385                 if (rc != 0)
386                         GOTO(trans_stop, rc);
387         }
388
389         for (i = 0; i < nr; i++) {
390                 fo = batch[i];
391                 LASSERT(fo);
392
393                 ofd_write_lock(env, fo);
394
395                 /* Only the new created objects need to be recorded. */
396                 if (ofd->ofd_osd->dd_record_fid_accessed) {
397                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
398
399                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
400                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
401                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
402                 }
403
404                 if (likely(!ofd_object_exists(fo) &&
405                            !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
406                         next = ofd_object_child(fo);
407                         LASSERT(next != NULL);
408
409                         rc = dt_create(env, next, &info->fti_attr, NULL,
410                                        &info->fti_dof, th);
411                         ofd_write_unlock(env, fo);
412                         if (rc < 0) {
413                                 if (i == 0)
414                                         GOTO(trans_stop, rc);
415
416                                 rc = 0;
417                                 break;
418                         }
419                         LASSERT(ofd_object_exists(fo));
420                 } else {
421                         ofd_write_unlock(env, fo);
422                 }
423
424                 ofd_seq_last_oid_set(oseq, id + i);
425         }
426
427         objects = i;
428         /* NOT all the wanted objects have been created,
429          * set the LAST_ID as the real created. */
430         if (unlikely(objects < nr)) {
431                 int rc1;
432
433                 info->fti_off = 0;
434                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
435                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
436                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
437                                       &info->fti_buf, &info->fti_off, th);
438                 dt_write_unlock(env, oseq->os_lastid_obj);
439                 if (rc1 != 0)
440                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
441                                ") from %llu to %llu\n", ofd_name(ofd),
442                                ostid_seq(&oseq->os_oi), id + nr - 1,
443                                ofd_seq_last_oid(oseq));
444         }
445
446         if (objects)
447                 ofd_precreate_cb_add(env, th, oseq, objects);
448 trans_stop:
449         rc2 = ofd_trans_stop(env, ofd, th, rc);
450         if (rc2)
451                 CERROR("%s: failed to stop transaction: rc = %d\n",
452                        ofd_name(ofd), rc2);
453         if (!rc)
454                 rc = rc2;
455 out:
456         for (i = 0; i < nr_saved; i++) {
457                 fo = batch[i];
458                 if (!fo)
459                         continue;
460                 ofd_object_put(env, fo);
461         }
462         OBD_FREE_PTR_ARRAY(batch, nr_saved);
463
464         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
465                "created %d/%d objects: %d\n", objects, nr_saved, rc);
466
467         LASSERT(ergo(objects == 0, rc < 0));
468         RETURN(objects > 0 ? objects : rc);
469 }
470
471 /**
472  * Fix the OFD object ownership.
473  *
474  * If the object still has SUID+SGID bits set, meaning that it was precreated
475  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
476  * then we will accept the UID/GID/PROJID if sent by the client for initializing
477  * the ownership of this object.  We only allow this to happen once (so clear
478  * these bits) and later only allow setattr.
479  *
480  * \param[in] env        execution environment
481  * \param[in] fo         OFD object
482  * \param[in] la         object attributes
483  * \param[in] is_setattr was this function called from setattr or not
484  *
485  * \retval              0 if successful
486  * \retval              negative value on error
487  */
488 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
489                          struct lu_attr *la, int is_setattr)
490 {
491         struct ofd_thread_info  *info = ofd_info(env);
492         struct lu_attr          *ln = &info->fti_attr2;
493         __u32                    mask = 0;
494         int                      rc;
495
496         ENTRY;
497
498         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
499             !(la->la_valid & LA_PROJID))
500                 RETURN(0);
501
502         rc = dt_attr_get(env, ofd_object_child(fo), ln);
503         if (rc != 0)
504                 RETURN(rc);
505
506         LASSERT(ln->la_valid & LA_MODE);
507
508         /*
509          * Only allow setattr to change UID/GID/PROJID, if
510          * SUID+SGID is not set which means this is not
511          * initialization of this objects.
512          */
513         if (!is_setattr) {
514                 if (!(ln->la_mode & S_ISUID))
515                         la->la_valid &= ~LA_UID;
516                 if (!(ln->la_mode & S_ISGID))
517                         la->la_valid &= ~LA_GID;
518                 if (!(ln->la_mode & S_ISVTX))
519                         la->la_valid &= ~LA_PROJID;
520         }
521
522         /* Initialize ownership of this object, clear SUID+SGID bits*/
523         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
524                 mask |= S_ISUID;
525         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
526                 mask |= S_ISGID;
527         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
528                 mask |= S_ISVTX;
529         if (mask != 0) {
530                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
531                         la->la_mode = ln->la_mode;
532                         la->la_valid |= LA_MODE;
533                 }
534                 la->la_mode &= ~mask;
535         }
536
537         RETURN(0);
538 }
539
540 /**
541  * Check if it needs to update filter_fid by the value of @oa.
542  *
543  * \param[in] env       env
544  * \param[in] fo        ofd object
545  * \param[in] oa        obdo from client or MDT
546  * \param[out] ff       if filter_fid needs updating, this field is used to
547  *                      return the new buffer
548  *
549  * \retval < 0          error occurred
550  * \retval 0            doesn't need to update filter_fid
551  * \retval FL_XATTR_{CREATE,REPLACE}    flag for xattr update
552  */
553 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
554                          const struct obdo *oa, struct filter_fid *ff)
555 {
556         int rc = 0;
557         ENTRY;
558
559         if (!(oa->o_valid &
560               (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
561                 RETURN(0);
562
563         rc = ofd_object_ff_load(env, fo);
564         if (rc < 0 && rc != -ENODATA)
565                 RETURN(rc);
566
567         LASSERT(ff != &fo->ofo_ff);
568         if (rc == -ENODATA) {
569                 rc = LU_XATTR_CREATE;
570                 memset(ff, 0, sizeof(*ff));
571         } else {
572                 rc = LU_XATTR_REPLACE;
573                 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
574         }
575
576         if (oa->o_valid & OBD_MD_FLFID) {
577                 /* packing fid and converting it to LE for storing into EA.
578                  * Here ->o_stripe_idx should be filled by LOV and rest of
579                  * fields - by client. */
580                 ff->ff_parent.f_seq = oa->o_parent_seq;
581                 ff->ff_parent.f_oid = oa->o_parent_oid;
582                 /* XXX: we are ignoring o_parent_ver here, since this should
583                  *      be the same for all objects in this fileset. */
584                 ff->ff_parent.f_ver = oa->o_stripe_idx;
585         }
586         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
587                 ff->ff_layout = oa->o_layout;
588
589         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
590                 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
591                        PFID(&fo->ofo_ff.ff_parent),
592                        PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
593                        ff->ff_layout_version, oa->o_layout_version);
594
595                 /**
596                  * resync write from client on non-primary objects and
597                  * resync start from MDS on primary objects will contain
598                  * LU_LAYOUT_RESYNC flag in the @oa.
599                  *
600                  * The layout version checking for write/punch from client
601                  * happens in ofd_verify_layout_version() before coming to
602                  * here, so that resync with smaller layout version client
603                  * will be rejected there, the biggest resync version will
604                  * be recorded in the OFD objects.
605                  */
606                 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
607                         /* this opens a new era of writing */
608                         ff->ff_layout_version = 0;
609                         ff->ff_range = 0;
610                 }
611
612                 /* it's not allowed to change it to a smaller value */
613                 if (ofd_layout_version_less(oa->o_layout_version,
614                                             ff->ff_layout_version))
615                         RETURN(-EINVAL);
616
617                 if (ff->ff_layout_version == 0 ||
618                     oa->o_layout_version & LU_LAYOUT_RESYNC) {
619                         /* if LU_LAYOUT_RESYNC is set, it closes the era of
620                          * writing. Only mirror I/O can write this object. */
621                         ff->ff_layout_version = oa->o_layout_version;
622                         ff->ff_range = 0;
623                 } else if (oa->o_layout_version > ff->ff_layout_version) {
624                         ff->ff_range = max_t(__u32, ff->ff_range,
625                                              oa->o_layout_version -
626                                              ff->ff_layout_version);
627                 }
628         }
629
630         if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
631                 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
632         else /* no change */
633                 rc = 0;
634
635         RETURN(rc);
636 }
637
638 /**
639  * Set OFD object attributes.
640  *
641  * This function sets OFD object attributes taken from incoming request.
642  * It sets not only regular attributes but also XATTR_NAME_FID extended
643  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
644  * to be found and verified by LFSCK and other tools in case of inconsistency.
645  *
646  * \param[in] env       execution environment
647  * \param[in] fo        OFD object
648  * \param[in] la        object attributes
649  * \param[in] oa        obdo carries fid, ost_layout, layout version
650  *
651  * \retval              0 if successful
652  * \retval              negative value on error
653  */
654 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
655                  struct lu_attr *la, struct obdo *oa)
656 {
657         struct ofd_thread_info *info = ofd_info(env);
658         struct ofd_device *ofd = ofd_obj2dev(fo);
659         struct filter_fid *ff = &info->fti_mds_fid;
660         struct thandle *th;
661         int fl, rc, rc2;
662
663         ENTRY;
664
665         if (!ofd_object_exists(fo))
666                 GOTO(out, rc = -ENOENT);
667
668         /* VBR: version recovery check */
669         rc = ofd_version_get_check(info, fo);
670         if (rc)
671                 GOTO(out, rc);
672
673         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
674         if (rc != 0)
675                 GOTO(out, rc);
676
677         th = ofd_trans_create(env, ofd);
678         if (IS_ERR(th))
679                 GOTO(out, rc = PTR_ERR(th));
680
681         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
682         if (rc)
683                 GOTO(stop, rc);
684
685         info->fti_buf.lb_buf = ff;
686         info->fti_buf.lb_len = sizeof(*ff);
687         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
688                                   XATTR_NAME_FID, 0, th);
689         if (rc)
690                 GOTO(stop, rc);
691
692         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
693         if (rc)
694                 GOTO(stop, rc);
695
696         ofd_write_lock(env, fo);
697         if (!ofd_object_exists(fo))
698                 GOTO(unlock, rc = -ENOENT);
699
700         /* serialize vs ofd_commitrw_write() */
701         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
702                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
703                                info->fti_xid);
704
705         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
706         if (rc)
707                 GOTO(unlock, rc);
708
709         fl = ofd_object_ff_update(env, fo, oa, ff);
710         if (fl < 0)
711                 GOTO(unlock, rc = fl);
712
713         if (fl) {
714                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
715                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
716                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
717                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
718                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
719                         GOTO(unlock, rc);
720
721                 info->fti_buf.lb_buf = ff;
722                 info->fti_buf.lb_len = sizeof(*ff);
723                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
724                                   XATTR_NAME_FID, fl, th);
725                 if (!rc)
726                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
727         }
728
729         GOTO(unlock, rc);
730
731 unlock:
732         ofd_write_unlock(env, fo);
733 stop:
734         rc2 = ofd_trans_stop(env, ofd, th, rc);
735         if (rc2)
736                 CERROR("%s: failed to stop transaction: rc = %d\n",
737                        ofd_name(ofd), rc2);
738         if (!rc)
739                 rc = rc2;
740 out:
741         return rc;
742 }
743
744 /**
745  * Fallocate(Preallocate) space for OFD object.
746  *
747  * This function allocates space for the object from the \a start
748  * offset to the \a end offset.
749  *
750  * \param[in] env       execution environment
751  * \param[in] fo        OFD object
752  * \param[in] start     start offset to allocate from
753  * \param[in] end       end of allocate
754  * \param[in] mode      fallocate mode
755  * \param[in] la        object attributes
756  * \param[in] ff        filter_fid structure
757  *
758  * \retval              0 if successful
759  * \retval              negative value on error
760  */
761 int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
762                          __u64 start, __u64 end, int mode, struct lu_attr *la,
763                          struct obdo *oa)
764 {
765         struct ofd_thread_info *info = ofd_info(env);
766         struct ofd_device *ofd = ofd_obj2dev(fo);
767         struct dt_object *dob = ofd_object_child(fo);
768         struct thandle *th;
769         struct filter_fid *ff = &info->fti_mds_fid;
770         bool ff_needed = false;
771         int rc;
772
773         ENTRY;
774
775         if (!ofd_object_exists(fo))
776                 RETURN(-ENOENT);
777
778         /* VBR: version recovery check */
779         rc = ofd_version_get_check(info, fo);
780         if (rc != 0)
781                 RETURN(rc);
782
783         if (ff != NULL) {
784                 rc = ofd_object_ff_load(env, fo);
785                 if (rc == -ENODATA)
786                         ff_needed = true;
787                 else if (rc < 0)
788                         RETURN(rc);
789
790                 if (ff_needed) {
791                         if (oa->o_valid & OBD_MD_FLFID) {
792                                 ff->ff_parent.f_seq = oa->o_parent_seq;
793                                 ff->ff_parent.f_oid = oa->o_parent_oid;
794                                 ff->ff_parent.f_ver = oa->o_stripe_idx;
795                         }
796                         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
797                                 ff->ff_layout = oa->o_layout;
798                         if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
799                                 ff->ff_layout_version = oa->o_layout_version;
800                         filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
801                 }
802         }
803
804         th = ofd_trans_create(env, ofd);
805         if (IS_ERR(th))
806                 RETURN(PTR_ERR(th));
807
808         rc = dt_declare_attr_set(env, dob, la, th);
809         if (rc)
810                 GOTO(stop, rc);
811
812         rc = dt_declare_fallocate(env, dob, start, end, mode, th);
813         if (rc)
814                 GOTO(stop, rc);
815
816         if (ff_needed) {
817                 info->fti_buf.lb_buf = ff;
818                 info->fti_buf.lb_len = sizeof(*ff);
819                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
820                                           &info->fti_buf, XATTR_NAME_FID, 0,
821                                           th);
822                 if (rc)
823                         GOTO(stop, rc);
824         }
825
826         rc = ofd_trans_start(env, ofd, fo, th);
827         if (rc)
828                 GOTO(stop, rc);
829
830         ofd_read_lock(env, fo);
831         if (!ofd_object_exists(fo))
832                 GOTO(unlock, rc = -ENOENT);
833
834         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
835                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
836                                info->fti_xid);
837
838         rc = dt_falloc(env, dob, start, end, mode, th);
839         if (rc)
840                 GOTO(unlock, rc);
841
842         rc = dt_attr_set(env, dob, la, th);
843         if (rc)
844                 GOTO(unlock, rc);
845
846         if (ff_needed) {
847                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
848                                   XATTR_NAME_FID, 0, th);
849                 if (!rc)
850                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
851         }
852 unlock:
853         ofd_read_unlock(env, fo);
854 stop:
855         ofd_trans_stop(env, ofd, th, rc);
856         RETURN(rc);
857 }
858
859 /**
860  * Truncate/punch OFD object.
861  *
862  * This function frees all of the allocated object's space from the \a start
863  * offset to the \a end offset. For truncate() operations the \a end offset
864  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
865  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
866  *
867  * \param[in] env       execution environment
868  * \param[in] fo        OFD object
869  * \param[in] start     start offset to punch from
870  * \param[in] end       end of punch
871  * \param[in] la        object attributes
872  * \param[in] oa        obdo struct from incoming request
873  *
874  * \retval              0 if successful
875  * \retval              negative value on error
876  */
877 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
878                      __u64 start, __u64 end, struct lu_attr *la,
879                      struct obdo *oa)
880 {
881         struct ofd_thread_info *info = ofd_info(env);
882         struct ofd_device *ofd = ofd_obj2dev(fo);
883         struct dt_object *dob = ofd_object_child(fo);
884         struct filter_fid *ff = &info->fti_mds_fid;
885         struct thandle *th;
886         int fl, rc, rc2;
887
888         ENTRY;
889
890         /* we support truncate, not punch yet */
891         LASSERT(end == OBD_OBJECT_EOF);
892
893         if (!ofd_object_exists(fo))
894                 GOTO(out, rc = -ENOENT);
895
896         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
897                 rc = ofd_verify_ff(env, fo, oa);
898                 if (rc != 0)
899                         GOTO(out, rc);
900         }
901
902         /* VBR: version recovery check */
903         rc = ofd_version_get_check(info, fo);
904         if (rc)
905                 GOTO(out, rc);
906
907         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
908         if (rc != 0)
909                 GOTO(out, rc);
910
911         th = ofd_trans_create(env, ofd);
912         if (IS_ERR(th))
913                 GOTO(out, rc = PTR_ERR(th));
914
915         rc = dt_declare_attr_set(env, dob, la, th);
916         if (rc)
917                 GOTO(stop, rc);
918
919         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
920         if (rc)
921                 GOTO(stop, rc);
922
923         info->fti_buf.lb_buf = ff;
924         info->fti_buf.lb_len = sizeof(*ff);
925         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
926                                   XATTR_NAME_FID, 0, th);
927         if (rc)
928                 GOTO(stop, rc);
929
930         rc = ofd_trans_start(env, ofd, fo, th);
931         if (rc)
932                 GOTO(stop, rc);
933
934         ofd_write_lock(env, fo);
935
936         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
937                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
938                                info->fti_xid);
939
940         if (!ofd_object_exists(fo))
941                 GOTO(unlock, rc = -ENOENT);
942
943         /* need to verify layout version */
944         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
945                 rc = ofd_verify_layout_version(env, fo, oa);
946                 if (rc)
947                         GOTO(unlock, rc);
948         }
949
950         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
951                 /* punch must be aware we are dealing with an encrypted file */
952                 struct lu_attr la = {
953                         .la_valid = LA_FLAGS,
954                         .la_flags = LUSTRE_ENCRYPT_FL,
955                 };
956
957                 rc = dt_attr_set(env, dob, &la, th);
958                 if (rc)
959                         GOTO(unlock, rc);
960         }
961         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
962         if (rc)
963                 GOTO(unlock, rc);
964
965         fl = ofd_object_ff_update(env, fo, oa, ff);
966         if (fl < 0)
967                 GOTO(unlock, rc = fl);
968
969         rc = dt_attr_set(env, dob, la, th);
970         if (rc)
971                 GOTO(unlock, rc);
972
973         if (fl) {
974                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
975                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
976                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
977                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
978                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
979                         GOTO(unlock, rc);
980
981                 info->fti_buf.lb_buf = ff;
982                 info->fti_buf.lb_len = sizeof(*ff);
983                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
984                                   XATTR_NAME_FID, fl, th);
985                 if (!rc)
986                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
987         }
988
989         GOTO(unlock, rc);
990
991 unlock:
992         ofd_write_unlock(env, fo);
993 stop:
994         rc2 = ofd_trans_stop(env, ofd, th, rc);
995         if (rc2 != 0)
996                 CERROR("%s: failed to stop transaction: rc = %d\n",
997                        ofd_name(ofd), rc2);
998         if (!rc)
999                 rc = rc2;
1000 out:
1001         return rc;
1002 }
1003
1004 /**
1005  * Destroy OFD object.
1006  *
1007  * This function destroys OFD object. If object wasn't used at all (orphan)
1008  * then local transaction is used, which means the transaction data is not
1009  * returned back in reply.
1010  *
1011  * \param[in] env       execution environment
1012  * \param[in] fo        OFD object
1013  * \param[in] orphan    flag to indicate that object is orphaned
1014  *
1015  * \retval              0 if successful
1016  * \retval              negative value on error
1017  */
1018 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
1019                        int orphan)
1020 {
1021         struct ofd_device       *ofd = ofd_obj2dev(fo);
1022         struct thandle          *th;
1023         int                     rc = 0;
1024         int                     rc2;
1025
1026         ENTRY;
1027
1028         if (!ofd_object_exists(fo))
1029                 GOTO(out, rc = -ENOENT);
1030
1031         th = ofd_trans_create(env, ofd);
1032         if (IS_ERR(th))
1033                 GOTO(out, rc = PTR_ERR(th));
1034
1035         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
1036         if (rc < 0)
1037                 GOTO(stop, rc);
1038
1039         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
1040         if (rc < 0)
1041                 GOTO(stop, rc);
1042
1043         if (orphan)
1044                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
1045         else
1046                 rc = ofd_trans_start(env, ofd, NULL, th);
1047         if (rc)
1048                 GOTO(stop, rc);
1049
1050         ofd_write_lock(env, fo);
1051         if (!ofd_object_exists(fo))
1052                 GOTO(unlock, rc = -ENOENT);
1053
1054         tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
1055
1056         dt_ref_del(env, ofd_object_child(fo), th);
1057         dt_destroy(env, ofd_object_child(fo), th);
1058 unlock:
1059         ofd_write_unlock(env, fo);
1060 stop:
1061         rc2 = ofd_trans_stop(env, ofd, th, rc);
1062         if (rc2)
1063                 CERROR("%s failed to stop transaction: %d\n",
1064                        ofd_name(ofd), rc2);
1065         if (!rc)
1066                 rc = rc2;
1067 out:
1068         RETURN(rc);
1069 }
1070
1071 /**
1072  * Get OFD object attributes.
1073  *
1074  * This function gets OFD object regular attributes. It is used to serve
1075  * incoming request as well as for local OFD purposes.
1076  *
1077  * \param[in] env       execution environment
1078  * \param[in] fo        OFD object
1079  * \param[in] la        object attributes
1080  *
1081  * \retval              0 if successful
1082  * \retval              negative value on error
1083  */
1084 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
1085                  struct lu_attr *la)
1086 {
1087         int rc = 0;
1088
1089         ENTRY;
1090
1091         if (ofd_object_exists(fo)) {
1092                 rc = dt_attr_get(env, ofd_object_child(fo), la);
1093         } else {
1094                 rc = -ENOENT;
1095         }
1096         RETURN(rc);
1097 }