Whamcloud - gitweb
LU-11760 ofd: limit num of objects to create in 1 transaction
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_objects.c
33  *
34  * This file contains OSD API methods related to OBD Filter Device (OFD)
35  * object operations.
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include <dt_object.h>
44 #include <lustre_lfsck.h>
45
46 #include "ofd_internal.h"
47
48 /**
49  * Get object version from disk and check it.
50  *
51  * This function checks object version from disk with
52  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53  * VBR (Version-Based Recovery) and ensures that object has the same version
54  * upon replay as it has during original modification.
55  *
56  * \param[in]  info     execution thread OFD private data
57  * \param[in]  fo       OFD object
58  *
59  * \retval              0 if version matches
60  * \retval              -EOVERFLOW on version mismatch
61  */
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63                                  struct ofd_object *fo)
64 {
65         dt_obj_version_t curr_version;
66
67         LASSERT(ofd_object_exists(fo));
68
69         if (info->fti_exp == NULL)
70                 RETURN(0);
71
72         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
73         if ((__s64)curr_version == -EOPNOTSUPP)
74                 RETURN(0);
75         /* VBR: version is checked always because costs nothing */
76         if (info->fti_pre_version != 0 &&
77             info->fti_pre_version != curr_version) {
78                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
79                        info->fti_pre_version, curr_version);
80                 spin_lock(&info->fti_exp->exp_lock);
81                 info->fti_exp->exp_vbr_failed = 1;
82                 spin_unlock(&info->fti_exp->exp_lock);
83                 RETURN (-EOVERFLOW);
84         }
85         info->fti_pre_version = curr_version;
86         RETURN(0);
87 }
88
89 /**
90  * Get OFD object by FID.
91  *
92  * This function finds OFD slice of compound object with the given FID.
93  *
94  * \param[in] env       execution environment
95  * \param[in] ofd       OFD device
96  * \param[in] fid       FID of the object
97  *
98  * \retval              pointer to the found ofd_object
99  * \retval              ERR_PTR(errno) in case of error
100  */
101 struct ofd_object *ofd_object_find(const struct lu_env *env,
102                                    struct ofd_device *ofd,
103                                    const struct lu_fid *fid)
104 {
105         struct ofd_object *fo;
106         struct lu_object  *o;
107
108         ENTRY;
109
110         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
111         if (likely(!IS_ERR(o)))
112                 fo = ofd_obj(o);
113         else
114                 fo = ERR_CAST(o); /* return error */
115
116         RETURN(fo);
117 }
118
119 /**
120  * Get FID of parent MDT object.
121  *
122  * This function reads extended attribute XATTR_NAME_FID of OFD object which
123  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
124  *
125  * The filter_fid::ff_parent::f_ver field currently holds
126  * the OST-object index in the parent MDT-object's layout EA,
127  * not the actual FID::f_ver of the parent. We therefore access
128  * it via the macro f_stripe_idx.
129  *
130  * \param[in] env       execution environment
131  * \param[in] fo        OFD object
132  *
133  * \retval              0 if successful
134  * \retval              -ENODATA if there is no such xattr
135  * \retval              negative value on error
136  */
137 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
138 {
139         struct ofd_thread_info *info = ofd_info(env);
140         struct filter_fid *ff = &fo->ofo_ff;
141         struct lu_buf *buf = &info->fti_buf;
142         int rc = 0;
143
144         if (fid_is_sane(&ff->ff_parent))
145                 return 0;
146
147         buf->lb_buf = ff;
148         buf->lb_len = sizeof(*ff);
149         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
150         if (rc < 0)
151                 return rc;
152
153         if (unlikely(rc < sizeof(struct lu_fid))) {
154                 fid_zero(&ff->ff_parent);
155                 return -EINVAL;
156         }
157
158         filter_fid_le_to_cpu(ff, ff, rc);
159
160         return 0;
161 }
162
163 struct ofd_precreate_cb {
164         struct dt_txn_commit_cb  opc_cb;
165         struct ofd_seq          *opc_oseq;
166         int                      opc_objects;
167 };
168
169 static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
170                              struct dt_txn_commit_cb *cb, int err)
171 {
172         struct ofd_precreate_cb *opc;
173         struct ofd_seq *oseq;
174
175         opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
176         oseq = opc->opc_oseq;
177
178         CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
179                opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
180                PFID(&oseq->os_oi.oi_fid), th->th_sync);
181         atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
182         ofd_seq_put(env, opc->opc_oseq);
183         OBD_FREE_PTR(opc);
184 }
185
186 static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
187                                 struct ofd_seq *oseq, int objects)
188 {
189         struct ofd_precreate_cb *opc;
190         struct dt_txn_commit_cb *dcb;
191         int precreate, rc;
192
193         OBD_ALLOC_PTR(opc);
194         if (!opc)
195                 return -ENOMEM;
196
197         precreate = atomic_read(&oseq->os_precreate_in_progress);
198         atomic_inc(&oseq->os_refc);
199         opc->opc_oseq = oseq;
200         opc->opc_objects = objects;
201         CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
202                opc->opc_objects, precreate,
203                PFID(&oseq->os_oi.oi_fid), th->th_sync);
204
205         if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
206                 th->th_sync = 1;
207
208         dcb = &opc->opc_cb;
209         dcb->dcb_func = ofd_cb_precreate;
210         INIT_LIST_HEAD(&dcb->dcb_linkage);
211         strlcpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
212
213         rc = dt_trans_cb_add(th, dcb);
214         if (rc) {
215                 ofd_seq_put(env, oseq);
216                 OBD_FREE_PTR(opc);
217                 return rc;
218         }
219
220         atomic_add(objects, &oseq->os_precreate_in_progress);
221
222         return 0;
223 }
224
225 /**
226  * Precreate the given number \a nr of objects in the given sequence \a oseq.
227  *
228  * This function precreates new OST objects in the given sequence.
229  * The precreation starts from \a id and creates \a nr objects sequentially.
230  *
231  * Notes:
232  * This function may create fewer objects than requested.
233  *
234  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
235  * first write. Currently the permission bits on the OST are never used,
236  * so this is OK.
237  *
238  * Initialize a/c/m time so any client timestamp will always be newer and
239  * update the inode. The ctime = 0 case is also handled specially in
240  * osd_inode_setattr(). See LU-221, LU-1042 for details.
241  *
242  * \param[in] env       execution environment
243  * \param[in] ofd       OFD device
244  * \param[in] id        object ID to start precreation from
245  * \param[in] oseq      object sequence
246  * \param[in] nr        number of objects to precreate
247  * \param[in] sync      synchronous precreation flag
248  *
249  * \retval              0 if successful
250  * \retval              negative value on error
251  */
252 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
253                           u64 id, struct ofd_seq *oseq, int nr, int sync)
254 {
255         struct ofd_thread_info  *info = ofd_info(env);
256         struct ofd_object       *fo = NULL;
257         struct dt_object        *next;
258         struct thandle          *th;
259         struct ofd_object       **batch;
260         struct lu_fid           *fid = &info->fti_fid;
261         u64                     tmp;
262         int                     rc;
263         int                     rc2;
264         int                     i;
265         int                     objects = 0;
266         int                     nr_saved = nr;
267
268         ENTRY;
269
270         /* Don't create objects beyond the valid range for this SEQ */
271         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
272                      (id + nr) > IDIF_MAX_OID)) {
273                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
274                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
275                 RETURN(rc = -ENOSPC);
276         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
277                             (id + nr) > OBIF_MAX_OID)) {
278                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
279                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
280                 RETURN(rc = -ENOSPC);
281         }
282
283         OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
284         if (batch == NULL)
285                 RETURN(-ENOMEM);
286
287         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
288         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
289         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
290
291         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
292         info->fti_attr.la_atime = 0;
293         info->fti_attr.la_mtime = 0;
294         info->fti_attr.la_ctime = 0;
295
296         LASSERT(id != 0);
297
298         /* prepare objects */
299         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
300         for (i = 0; i < nr; i++) {
301                 rc = fid_set_id(fid, id + i);
302                 if (rc != 0) {
303                         if (i == 0)
304                                 GOTO(out, rc);
305
306                         nr = i;
307                         break;
308                 }
309
310                 fo = ofd_object_find(env, ofd, fid);
311                 if (IS_ERR(fo)) {
312                         if (i == 0)
313                                 GOTO(out, rc = PTR_ERR(fo));
314
315                         nr = i;
316                         break;
317                 }
318
319                 ofd_write_lock(env, fo);
320                 batch[i] = fo;
321         }
322         info->fti_buf.lb_buf = &tmp;
323         info->fti_buf.lb_len = sizeof(tmp);
324         info->fti_off = 0;
325
326         th = ofd_trans_create(env, ofd);
327         if (IS_ERR(th))
328                 GOTO(out, rc = PTR_ERR(th));
329
330         th->th_sync |= sync;
331
332         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
333                                      info->fti_off, th);
334         if (rc)
335                 GOTO(trans_stop, rc);
336
337         for (i = 0; i < nr; i++) {
338                 fo = batch[i];
339                 LASSERT(fo);
340
341                 if (unlikely(ofd_object_exists(fo))) {
342                         /* object may exist being re-created by write replay */
343                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
344                                DFID"\n", ostid_seq(&oseq->os_oi), id,
345                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
346                         continue;
347                 }
348
349                 next = ofd_object_child(fo);
350                 LASSERT(next != NULL);
351
352                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
353                                        &info->fti_dof, th);
354                 if (rc < 0) {
355                         if (i == 0)
356                                 GOTO(trans_stop, rc);
357
358                         nr = i;
359                         break;
360                 }
361         }
362
363         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
364         if (rc)
365                 GOTO(trans_stop, rc);
366
367         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
368                ofd_name(ofd), PFID(fid), nr);
369
370          /* When the LFSCK scanning the whole device to verify the LAST_ID file
371           * consistency, it will load the last_id into RAM firstly, and compare
372           * the last_id with each OST-object's ID. If the later one is larger,
373           * then it will regard the LAST_ID file crashed. But during the LFSCK
374           * scanning, the OFD may continue to create new OST-objects. Those new
375           * created OST-objects will have larger IDs than the LFSCK known ones.
376           * So from the LFSCK view, it needs to re-load the last_id from disk
377           * file, and if the latest last_id is still smaller than the object's
378           * ID, then the LAST_ID file is real crashed.
379           *
380           * To make above mechanism to work, before OFD pre-create OST-objects,
381           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
382           * may cannot get latest last_id although new OST-object created. */
383         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
384                 tmp = cpu_to_le64(id + nr - 1);
385                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
386                 rc = dt_record_write(env, oseq->os_lastid_obj,
387                                      &info->fti_buf, &info->fti_off, th);
388                 dt_write_unlock(env, oseq->os_lastid_obj);
389                 if (rc != 0)
390                         GOTO(trans_stop, rc);
391         }
392
393         for (i = 0; i < nr; i++) {
394                 fo = batch[i];
395                 LASSERT(fo);
396
397                 /* Only the new created objects need to be recorded. */
398                 if (ofd->ofd_osd->dd_record_fid_accessed) {
399                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
400
401                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
402                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
403                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
404                 }
405
406                 if (likely(!ofd_object_exists(fo) &&
407                            !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
408                         next = ofd_object_child(fo);
409                         LASSERT(next != NULL);
410
411                         rc = dt_create(env, next, &info->fti_attr, NULL,
412                                        &info->fti_dof, th);
413                         if (rc < 0) {
414                                 if (i == 0)
415                                         GOTO(trans_stop, rc);
416
417                                 rc = 0;
418                                 break;
419                         }
420                         LASSERT(ofd_object_exists(fo));
421                 }
422                 ofd_seq_last_oid_set(oseq, id + i);
423         }
424
425         objects = i;
426         /* NOT all the wanted objects have been created,
427          * set the LAST_ID as the real created. */
428         if (unlikely(objects < nr)) {
429                 int rc1;
430
431                 info->fti_off = 0;
432                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
433                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
434                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
435                                       &info->fti_buf, &info->fti_off, th);
436                 dt_write_unlock(env, oseq->os_lastid_obj);
437                 if (rc1 != 0)
438                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
439                                ") from %llu to %llu\n", ofd_name(ofd),
440                                ostid_seq(&oseq->os_oi), id + nr - 1,
441                                ofd_seq_last_oid(oseq));
442         }
443
444         if (objects)
445                 ofd_precreate_cb_add(env, th, oseq, objects);
446 trans_stop:
447         rc2 = ofd_trans_stop(env, ofd, th, rc);
448         if (rc2)
449                 CERROR("%s: failed to stop transaction: rc = %d\n",
450                        ofd_name(ofd), rc2);
451         if (!rc)
452                 rc = rc2;
453 out:
454         for (i = 0; i < nr_saved; i++) {
455                 fo = batch[i];
456                 if (fo) {
457                         ofd_write_unlock(env, fo);
458                         ofd_object_put(env, fo);
459                 }
460         }
461         OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
462
463         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
464                "created %d/%d objects: %d\n", objects, nr_saved, rc);
465
466         LASSERT(ergo(objects == 0, rc < 0));
467         RETURN(objects > 0 ? objects : rc);
468 }
469
470 /**
471  * Fix the OFD object ownership.
472  *
473  * If the object still has SUID+SGID bits set, meaning that it was precreated
474  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
475  * then we will accept the UID/GID/PROJID if sent by the client for initializing
476  * the ownership of this object.  We only allow this to happen once (so clear
477  * these bits) and later only allow setattr.
478  *
479  * \param[in] env        execution environment
480  * \param[in] fo         OFD object
481  * \param[in] la         object attributes
482  * \param[in] is_setattr was this function called from setattr or not
483  *
484  * \retval              0 if successful
485  * \retval              negative value on error
486  */
487 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
488                          struct lu_attr *la, int is_setattr)
489 {
490         struct ofd_thread_info  *info = ofd_info(env);
491         struct lu_attr          *ln = &info->fti_attr2;
492         __u32                    mask = 0;
493         int                      rc;
494
495         ENTRY;
496
497         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
498             !(la->la_valid & LA_PROJID))
499                 RETURN(0);
500
501         rc = dt_attr_get(env, ofd_object_child(fo), ln);
502         if (rc != 0)
503                 RETURN(rc);
504
505         LASSERT(ln->la_valid & LA_MODE);
506
507         /*
508          * Only allow setattr to change UID/GID/PROJID, if
509          * SUID+SGID is not set which means this is not
510          * initialization of this objects.
511          */
512         if (!is_setattr) {
513                 if (!(ln->la_mode & S_ISUID))
514                         la->la_valid &= ~LA_UID;
515                 if (!(ln->la_mode & S_ISGID))
516                         la->la_valid &= ~LA_GID;
517                 if (!(ln->la_mode & S_ISVTX))
518                         la->la_valid &= ~LA_PROJID;
519         }
520
521         /* Initialize ownership of this object, clear SUID+SGID bits*/
522         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
523                 mask |= S_ISUID;
524         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
525                 mask |= S_ISGID;
526         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
527                 mask |= S_ISVTX;
528         if (mask != 0) {
529                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
530                         la->la_mode = ln->la_mode;
531                         la->la_valid |= LA_MODE;
532                 }
533                 la->la_mode &= ~mask;
534         }
535
536         RETURN(0);
537 }
538
539 /**
540  * Check if it needs to update filter_fid by the value of @oa.
541  *
542  * \param[in] env       env
543  * \param[in] fo        ofd object
544  * \param[in] oa        obdo from client or MDT
545  * \param[out] ff       if filter_fid needs updating, this field is used to
546  *                      return the new buffer
547  *
548  * \retval < 0          error occurred
549  * \retval 0            doesn't need to update filter_fid
550  * \retval FL_XATTR_{CREATE,REPLACE}    flag for xattr update
551  */
552 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
553                          const struct obdo *oa, struct filter_fid *ff)
554 {
555         int rc = 0;
556         ENTRY;
557
558         if (!(oa->o_valid &
559               (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
560                 RETURN(0);
561
562         rc = ofd_object_ff_load(env, fo);
563         if (rc < 0 && rc != -ENODATA)
564                 RETURN(rc);
565
566         LASSERT(ff != &fo->ofo_ff);
567         if (rc == -ENODATA) {
568                 rc = LU_XATTR_CREATE;
569                 memset(ff, 0, sizeof(*ff));
570         } else {
571                 rc = LU_XATTR_REPLACE;
572                 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
573         }
574
575         if (oa->o_valid & OBD_MD_FLFID) {
576                 /* packing fid and converting it to LE for storing into EA.
577                  * Here ->o_stripe_idx should be filled by LOV and rest of
578                  * fields - by client. */
579                 ff->ff_parent.f_seq = oa->o_parent_seq;
580                 ff->ff_parent.f_oid = oa->o_parent_oid;
581                 /* XXX: we are ignoring o_parent_ver here, since this should
582                  *      be the same for all objects in this fileset. */
583                 ff->ff_parent.f_ver = oa->o_stripe_idx;
584         }
585         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
586                 ff->ff_layout = oa->o_layout;
587
588         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
589                 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
590                        PFID(&fo->ofo_ff.ff_parent),
591                        PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
592                        ff->ff_layout_version, oa->o_layout_version);
593
594                 /* only the MDS has the authority to update layout version */
595                 if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
596                       OBD_CONNECT_MDS)) {
597                         CERROR(DFID": update layout version from client\n",
598                                PFID(&fo->ofo_ff.ff_parent));
599
600                         RETURN(-EPERM);
601                 }
602
603                 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
604                         /* this opens a new era of writing */
605                         ff->ff_layout_version = 0;
606                         ff->ff_range = 0;
607                 }
608
609                 /* it's not allowed to change it to a smaller value */
610                 if (oa->o_layout_version < ff->ff_layout_version)
611                         RETURN(-EINVAL);
612
613                 if (ff->ff_layout_version == 0 ||
614                     oa->o_layout_version & LU_LAYOUT_RESYNC) {
615                         /* if LU_LAYOUT_RESYNC is set, it closes the era of
616                          * writing. Only mirror I/O can write this object. */
617                         ff->ff_layout_version = oa->o_layout_version;
618                         ff->ff_range = 0;
619                 } else if (oa->o_layout_version > ff->ff_layout_version) {
620                         ff->ff_range = MAX(ff->ff_range,
621                                   oa->o_layout_version - ff->ff_layout_version);
622                 }
623         }
624
625         if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
626                 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
627         else /* no change */
628                 rc = 0;
629
630         RETURN(rc);
631 }
632
633 /**
634  * Set OFD object attributes.
635  *
636  * This function sets OFD object attributes taken from incoming request.
637  * It sets not only regular attributes but also XATTR_NAME_FID extended
638  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
639  * to be found and verified by LFSCK and other tools in case of inconsistency.
640  *
641  * \param[in] env       execution environment
642  * \param[in] fo        OFD object
643  * \param[in] la        object attributes
644  * \param[in] oa        obdo carries fid, ost_layout, layout version
645  *
646  * \retval              0 if successful
647  * \retval              negative value on error
648  */
649 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
650                  struct lu_attr *la, struct obdo *oa)
651 {
652         struct ofd_thread_info *info = ofd_info(env);
653         struct ofd_device *ofd = ofd_obj2dev(fo);
654         struct filter_fid *ff = &info->fti_mds_fid;
655         struct thandle *th;
656         int fl, rc, rc2;
657
658         ENTRY;
659
660         ofd_write_lock(env, fo);
661         if (!ofd_object_exists(fo))
662                 GOTO(unlock, rc = -ENOENT);
663
664         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
665                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
666                                info->fti_xid);
667
668         /* VBR: version recovery check */
669         rc = ofd_version_get_check(info, fo);
670         if (rc)
671                 GOTO(unlock, rc);
672
673         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
674         if (rc != 0)
675                 GOTO(unlock, rc);
676
677         fl = ofd_object_ff_update(env, fo, oa, ff);
678         if (fl < 0)
679                 GOTO(unlock, rc = fl);
680
681         th = ofd_trans_create(env, ofd);
682         if (IS_ERR(th))
683                 GOTO(unlock, rc = PTR_ERR(th));
684
685         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
686         if (rc)
687                 GOTO(stop, rc);
688
689         if (fl) {
690                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
691                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
692                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
693                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
694
695                 info->fti_buf.lb_buf = ff;
696                 info->fti_buf.lb_len = sizeof(*ff);
697                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
698                                           &info->fti_buf, XATTR_NAME_FID, fl,
699                                           th);
700                 if (rc)
701                         GOTO(stop, rc);
702         }
703
704         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
705         if (rc)
706                 GOTO(stop, rc);
707
708         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
709         if (rc)
710                 GOTO(stop, rc);
711
712         if (fl) {
713                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
714                         GOTO(stop, rc);
715
716                 info->fti_buf.lb_buf = ff;
717                 info->fti_buf.lb_len = sizeof(*ff);
718                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
719                                   XATTR_NAME_FID, fl, th);
720                 if (!rc)
721                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
722         }
723
724         GOTO(stop, rc);
725
726 stop:
727         rc2 = ofd_trans_stop(env, ofd, th, rc);
728         if (rc2)
729                 CERROR("%s: failed to stop transaction: rc = %d\n",
730                        ofd_name(ofd), rc2);
731         if (!rc)
732                 rc = rc2;
733
734 unlock:
735         ofd_write_unlock(env, fo);
736
737         return rc;
738 }
739
740 /**
741  * Truncate/punch OFD object.
742  *
743  * This function frees all of the allocated object's space from the \a start
744  * offset to the \a end offset. For truncate() operations the \a end offset
745  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
746  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
747  *
748  * \param[in] env       execution environment
749  * \param[in] fo        OFD object
750  * \param[in] start     start offset to punch from
751  * \param[in] end       end of punch
752  * \param[in] la        object attributes
753  * \param[in] oa        obdo struct from incoming request
754  *
755  * \retval              0 if successful
756  * \retval              negative value on error
757  */
758 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
759                      __u64 start, __u64 end, struct lu_attr *la,
760                      struct obdo *oa)
761 {
762         struct ofd_thread_info *info = ofd_info(env);
763         struct ofd_device *ofd = ofd_obj2dev(fo);
764         struct dt_object *dob = ofd_object_child(fo);
765         struct filter_fid *ff = &info->fti_mds_fid;
766         struct thandle *th;
767         int fl, rc, rc2;
768
769         ENTRY;
770
771         /* we support truncate, not punch yet */
772         LASSERT(end == OBD_OBJECT_EOF);
773
774         ofd_write_lock(env, fo);
775         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
776                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
777                                info->fti_xid);
778
779         if (!ofd_object_exists(fo))
780                 GOTO(unlock, rc = -ENOENT);
781
782         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
783                 rc = ofd_verify_ff(env, fo, oa);
784                 if (rc != 0)
785                         GOTO(unlock, rc);
786         }
787
788         /* need to verify layout version */
789         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
790                 rc = ofd_verify_layout_version(env, fo, oa);
791                 if (rc)
792                         GOTO(unlock, rc);
793
794                 oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
795         }
796
797         /* VBR: version recovery check */
798         rc = ofd_version_get_check(info, fo);
799         if (rc)
800                 GOTO(unlock, rc);
801
802         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
803         if (rc != 0)
804                 GOTO(unlock, rc);
805
806         fl = ofd_object_ff_update(env, fo, oa, ff);
807         if (fl < 0)
808                 GOTO(unlock, rc = fl);
809
810         th = ofd_trans_create(env, ofd);
811         if (IS_ERR(th))
812                 GOTO(unlock, rc = PTR_ERR(th));
813
814         rc = dt_declare_attr_set(env, dob, la, th);
815         if (rc)
816                 GOTO(stop, rc);
817
818         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
819         if (rc)
820                 GOTO(stop, rc);
821
822         if (fl) {
823                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
824                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
825                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
826                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
827
828                 info->fti_buf.lb_buf = ff;
829                 info->fti_buf.lb_len = sizeof(*ff);
830                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
831                                           &info->fti_buf, XATTR_NAME_FID, fl,
832                                           th);
833                 if (rc)
834                         GOTO(stop, rc);
835         }
836
837         rc = ofd_trans_start(env, ofd, fo, th);
838         if (rc)
839                 GOTO(stop, rc);
840
841         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
842         if (rc)
843                 GOTO(stop, rc);
844
845         rc = dt_attr_set(env, dob, la, th);
846         if (rc)
847                 GOTO(stop, rc);
848
849         if (fl) {
850                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
851                         GOTO(stop, rc);
852
853                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
854                                   XATTR_NAME_FID, fl, th);
855                 if (!rc)
856                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
857         }
858
859         GOTO(stop, rc);
860
861 stop:
862         rc2 = ofd_trans_stop(env, ofd, th, rc);
863         if (rc2 != 0)
864                 CERROR("%s: failed to stop transaction: rc = %d\n",
865                        ofd_name(ofd), rc2);
866         if (!rc)
867                 rc = rc2;
868 unlock:
869         ofd_write_unlock(env, fo);
870
871         return rc;
872 }
873
874 /**
875  * Destroy OFD object.
876  *
877  * This function destroys OFD object. If object wasn't used at all (orphan)
878  * then local transaction is used, which means the transaction data is not
879  * returned back in reply.
880  *
881  * \param[in] env       execution environment
882  * \param[in] fo        OFD object
883  * \param[in] orphan    flag to indicate that object is orphaned
884  *
885  * \retval              0 if successful
886  * \retval              negative value on error
887  */
888 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
889                        int orphan)
890 {
891         struct ofd_device       *ofd = ofd_obj2dev(fo);
892         struct thandle          *th;
893         int                     rc = 0;
894         int                     rc2;
895
896         ENTRY;
897
898         ofd_write_lock(env, fo);
899         if (!ofd_object_exists(fo))
900                 GOTO(unlock, rc = -ENOENT);
901
902         th = ofd_trans_create(env, ofd);
903         if (IS_ERR(th))
904                 GOTO(unlock, rc = PTR_ERR(th));
905
906         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
907         if (rc < 0)
908                 GOTO(stop, rc);
909
910         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
911         if (rc < 0)
912                 GOTO(stop, rc);
913
914         if (orphan)
915                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
916         else
917                 rc = ofd_trans_start(env, ofd, NULL, th);
918         if (rc)
919                 GOTO(stop, rc);
920
921         tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
922
923         dt_ref_del(env, ofd_object_child(fo), th);
924         dt_destroy(env, ofd_object_child(fo), th);
925 stop:
926         rc2 = ofd_trans_stop(env, ofd, th, rc);
927         if (rc2)
928                 CERROR("%s failed to stop transaction: %d\n",
929                        ofd_name(ofd), rc2);
930         if (!rc)
931                 rc = rc2;
932 unlock:
933         ofd_write_unlock(env, fo);
934         RETURN(rc);
935 }
936
937 /**
938  * Get OFD object attributes.
939  *
940  * This function gets OFD object regular attributes. It is used to serve
941  * incoming request as well as for local OFD purposes.
942  *
943  * \param[in] env       execution environment
944  * \param[in] fo        OFD object
945  * \param[in] la        object attributes
946  *
947  * \retval              0 if successful
948  * \retval              negative value on error
949  */
950 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
951                  struct lu_attr *la)
952 {
953         int rc = 0;
954
955         ENTRY;
956
957         if (ofd_object_exists(fo)) {
958                 rc = dt_attr_get(env, ofd_object_child(fo), la);
959         } else {
960                 rc = -ENOENT;
961         }
962         RETURN(rc);
963 }