Whamcloud - gitweb
LU-13935 ofd: object removal is not handled properly
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_objects.c
33  *
34  * This file contains OSD API methods related to OBD Filter Device (OFD)
35  * object operations.
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include <dt_object.h>
44 #include <lustre_lfsck.h>
45
46 #include "ofd_internal.h"
47
48 /**
49  * Get object version from disk and check it.
50  *
51  * This function checks object version from disk with
52  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53  * VBR (Version-Based Recovery) and ensures that object has the same version
54  * upon replay as it has during original modification.
55  *
56  * \param[in]  info     execution thread OFD private data
57  * \param[in]  fo       OFD object
58  *
59  * \retval              0 if version matches
60  * \retval              -EOVERFLOW on version mismatch
61  */
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63                                  struct ofd_object *fo)
64 {
65         dt_obj_version_t curr_version;
66
67         if (info->fti_exp == NULL)
68                 RETURN(0);
69
70         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
71         if ((__s64)curr_version == -EOPNOTSUPP)
72                 RETURN(0);
73         /* VBR: version is checked always because costs nothing */
74         if (info->fti_pre_version != 0 &&
75             info->fti_pre_version != curr_version) {
76                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
77                        info->fti_pre_version, curr_version);
78                 spin_lock(&info->fti_exp->exp_lock);
79                 info->fti_exp->exp_vbr_failed = 1;
80                 spin_unlock(&info->fti_exp->exp_lock);
81                 RETURN (-EOVERFLOW);
82         }
83         info->fti_pre_version = curr_version;
84         RETURN(0);
85 }
86
87 /**
88  * Get OFD object by FID.
89  *
90  * This function finds OFD slice of compound object with the given FID.
91  *
92  * \param[in] env       execution environment
93  * \param[in] ofd       OFD device
94  * \param[in] fid       FID of the object
95  *
96  * \retval              pointer to the found ofd_object
97  * \retval              ERR_PTR(errno) in case of error
98  */
99 struct ofd_object *ofd_object_find(const struct lu_env *env,
100                                    struct ofd_device *ofd,
101                                    const struct lu_fid *fid)
102 {
103         struct ofd_object *fo;
104         struct lu_object  *o;
105
106         ENTRY;
107
108         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
109         if (likely(!IS_ERR(o)))
110                 fo = ofd_obj(o);
111         else
112                 fo = ERR_CAST(o); /* return error */
113
114         RETURN(fo);
115 }
116
117 /**
118  * Get FID of parent MDT object.
119  *
120  * This function reads extended attribute XATTR_NAME_FID of OFD object which
121  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
122  *
123  * The filter_fid::ff_parent::f_ver field currently holds
124  * the OST-object index in the parent MDT-object's layout EA,
125  * not the actual FID::f_ver of the parent. We therefore access
126  * it via the macro f_stripe_idx.
127  *
128  * \param[in] env       execution environment
129  * \param[in] fo        OFD object
130  *
131  * \retval              0 if successful
132  * \retval              -ENODATA if there is no such xattr
133  * \retval              negative value on error
134  */
135 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
136 {
137         struct ofd_thread_info *info = ofd_info(env);
138         struct filter_fid *ff = &fo->ofo_ff;
139         struct lu_buf *buf = &info->fti_buf;
140         int rc = 0;
141
142         if (fid_is_sane(&ff->ff_parent))
143                 return 0;
144
145         buf->lb_buf = ff;
146         buf->lb_len = sizeof(*ff);
147         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
148         if (rc < 0)
149                 return rc;
150
151         if (unlikely(rc < sizeof(struct lu_fid))) {
152                 fid_zero(&ff->ff_parent);
153                 return -EINVAL;
154         }
155
156         filter_fid_le_to_cpu(ff, ff, rc);
157
158         return 0;
159 }
160
161 struct ofd_precreate_cb {
162         struct dt_txn_commit_cb  opc_cb;
163         struct ofd_seq          *opc_oseq;
164         int                      opc_objects;
165 };
166
167 static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
168                              struct dt_txn_commit_cb *cb, int err)
169 {
170         struct ofd_precreate_cb *opc;
171         struct ofd_seq *oseq;
172
173         opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
174         oseq = opc->opc_oseq;
175
176         CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
177                opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
178                PFID(&oseq->os_oi.oi_fid), th->th_sync);
179         atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
180         ofd_seq_put(env, opc->opc_oseq);
181         OBD_FREE_PTR(opc);
182 }
183
184 static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
185                                 struct ofd_seq *oseq, int objects)
186 {
187         struct ofd_precreate_cb *opc;
188         struct dt_txn_commit_cb *dcb;
189         int precreate, rc;
190
191         OBD_ALLOC_PTR(opc);
192         if (!opc)
193                 return -ENOMEM;
194
195         precreate = atomic_read(&oseq->os_precreate_in_progress);
196         atomic_inc(&oseq->os_refc);
197         opc->opc_oseq = oseq;
198         opc->opc_objects = objects;
199         CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
200                opc->opc_objects, precreate,
201                PFID(&oseq->os_oi.oi_fid), th->th_sync);
202
203         if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
204                 th->th_sync = 1;
205
206         dcb = &opc->opc_cb;
207         dcb->dcb_func = ofd_cb_precreate;
208         INIT_LIST_HEAD(&dcb->dcb_linkage);
209         strlcpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
210
211         rc = dt_trans_cb_add(th, dcb);
212         if (rc) {
213                 ofd_seq_put(env, oseq);
214                 OBD_FREE_PTR(opc);
215                 return rc;
216         }
217
218         atomic_add(objects, &oseq->os_precreate_in_progress);
219
220         return 0;
221 }
222
223 /**
224  * Precreate the given number \a nr of objects in the given sequence \a oseq.
225  *
226  * This function precreates new OST objects in the given sequence.
227  * The precreation starts from \a id and creates \a nr objects sequentially.
228  *
229  * Notes:
230  * This function may create fewer objects than requested.
231  *
232  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
233  * first write. Currently the permission bits on the OST are never used,
234  * so this is OK.
235  *
236  * Initialize a/c/m time so any client timestamp will always be newer and
237  * update the inode. The ctime = 0 case is also handled specially in
238  * osd_inode_setattr(). See LU-221, LU-1042 for details.
239  *
240  * \param[in] env       execution environment
241  * \param[in] ofd       OFD device
242  * \param[in] id        object ID to start precreation from
243  * \param[in] oseq      object sequence
244  * \param[in] nr        number of objects to precreate
245  * \param[in] sync      synchronous precreation flag
246  *
247  * \retval              0 if successful
248  * \retval              negative value on error
249  */
250 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
251                           u64 id, struct ofd_seq *oseq, int nr, int sync)
252 {
253         struct ofd_thread_info  *info = ofd_info(env);
254         struct ofd_object       *fo = NULL;
255         struct dt_object        *next;
256         struct thandle          *th;
257         struct ofd_object       **batch;
258         struct lu_fid           *fid = &info->fti_fid;
259         u64                     tmp;
260         int                     rc;
261         int                     rc2;
262         int                     i;
263         int                     objects = 0;
264         int                     nr_saved = nr;
265
266         ENTRY;
267
268         /* Don't create objects beyond the valid range for this SEQ */
269         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
270                      (id + nr) > IDIF_MAX_OID)) {
271                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
272                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
273                 RETURN(rc = -ENOSPC);
274         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
275                             (id + nr) > OBIF_MAX_OID)) {
276                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
277                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
278                 RETURN(rc = -ENOSPC);
279         }
280
281         OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
282         if (batch == NULL)
283                 RETURN(-ENOMEM);
284
285         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
286         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
287         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
288
289         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
290         info->fti_attr.la_atime = 0;
291         info->fti_attr.la_mtime = 0;
292         info->fti_attr.la_ctime = 0;
293
294         LASSERT(id != 0);
295
296         /* prepare objects */
297         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
298         for (i = 0; i < nr; i++) {
299                 rc = fid_set_id(fid, id + i);
300                 if (rc != 0) {
301                         if (i == 0)
302                                 GOTO(out, rc);
303
304                         nr = i;
305                         break;
306                 }
307
308                 fo = ofd_object_find(env, ofd, fid);
309                 if (IS_ERR(fo)) {
310                         if (i == 0)
311                                 GOTO(out, rc = PTR_ERR(fo));
312
313                         nr = i;
314                         break;
315                 }
316
317                 batch[i] = fo;
318         }
319         info->fti_buf.lb_buf = &tmp;
320         info->fti_buf.lb_len = sizeof(tmp);
321         info->fti_off = 0;
322
323         th = ofd_trans_create(env, ofd);
324         if (IS_ERR(th))
325                 GOTO(out, rc = PTR_ERR(th));
326
327         th->th_sync |= sync;
328
329         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
330                                      info->fti_off, th);
331         if (rc)
332                 GOTO(trans_stop, rc);
333
334         for (i = 0; i < nr; i++) {
335                 fo = batch[i];
336                 LASSERT(fo);
337
338                 if (unlikely(ofd_object_exists(fo))) {
339                         /* object may exist being re-created by write replay */
340                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
341                                DFID"\n", ostid_seq(&oseq->os_oi), id,
342                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
343                         continue;
344                 }
345
346                 next = ofd_object_child(fo);
347                 LASSERT(next != NULL);
348
349                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
350                                        &info->fti_dof, th);
351                 if (rc < 0) {
352                         if (i == 0)
353                                 GOTO(trans_stop, rc);
354
355                         nr = i;
356                         break;
357                 }
358         }
359
360         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
361         if (rc)
362                 GOTO(trans_stop, rc);
363
364         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
365                ofd_name(ofd), PFID(fid), nr);
366
367          /* When the LFSCK scanning the whole device to verify the LAST_ID file
368           * consistency, it will load the last_id into RAM firstly, and compare
369           * the last_id with each OST-object's ID. If the later one is larger,
370           * then it will regard the LAST_ID file crashed. But during the LFSCK
371           * scanning, the OFD may continue to create new OST-objects. Those new
372           * created OST-objects will have larger IDs than the LFSCK known ones.
373           * So from the LFSCK view, it needs to re-load the last_id from disk
374           * file, and if the latest last_id is still smaller than the object's
375           * ID, then the LAST_ID file is real crashed.
376           *
377           * To make above mechanism to work, before OFD pre-create OST-objects,
378           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
379           * may cannot get latest last_id although new OST-object created. */
380         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
381                 tmp = cpu_to_le64(id + nr - 1);
382                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
383                 rc = dt_record_write(env, oseq->os_lastid_obj,
384                                      &info->fti_buf, &info->fti_off, th);
385                 dt_write_unlock(env, oseq->os_lastid_obj);
386                 if (rc != 0)
387                         GOTO(trans_stop, rc);
388         }
389
390         for (i = 0; i < nr; i++) {
391                 fo = batch[i];
392                 LASSERT(fo);
393
394                 ofd_write_lock(env, fo);
395
396                 /* Only the new created objects need to be recorded. */
397                 if (ofd->ofd_osd->dd_record_fid_accessed) {
398                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
399
400                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
401                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
402                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
403                 }
404
405                 if (likely(!ofd_object_exists(fo) &&
406                            !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
407                         next = ofd_object_child(fo);
408                         LASSERT(next != NULL);
409
410                         rc = dt_create(env, next, &info->fti_attr, NULL,
411                                        &info->fti_dof, th);
412                         ofd_write_unlock(env, fo);
413                         if (rc < 0) {
414                                 if (i == 0)
415                                         GOTO(trans_stop, rc);
416
417                                 rc = 0;
418                                 break;
419                         }
420                         LASSERT(ofd_object_exists(fo));
421                 } else {
422                         ofd_write_unlock(env, fo);
423                 }
424
425                 ofd_seq_last_oid_set(oseq, id + i);
426         }
427
428         objects = i;
429         /* NOT all the wanted objects have been created,
430          * set the LAST_ID as the real created. */
431         if (unlikely(objects < nr)) {
432                 int rc1;
433
434                 info->fti_off = 0;
435                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
436                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
437                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
438                                       &info->fti_buf, &info->fti_off, th);
439                 dt_write_unlock(env, oseq->os_lastid_obj);
440                 if (rc1 != 0)
441                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
442                                ") from %llu to %llu\n", ofd_name(ofd),
443                                ostid_seq(&oseq->os_oi), id + nr - 1,
444                                ofd_seq_last_oid(oseq));
445         }
446
447         if (objects)
448                 ofd_precreate_cb_add(env, th, oseq, objects);
449 trans_stop:
450         rc2 = ofd_trans_stop(env, ofd, th, rc);
451         if (rc2)
452                 CERROR("%s: failed to stop transaction: rc = %d\n",
453                        ofd_name(ofd), rc2);
454         if (!rc)
455                 rc = rc2;
456 out:
457         for (i = 0; i < nr_saved; i++) {
458                 fo = batch[i];
459                 if (!fo)
460                         continue;
461                 ofd_object_put(env, fo);
462         }
463         OBD_FREE_PTR_ARRAY(batch, nr_saved);
464
465         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
466                "created %d/%d objects: %d\n", objects, nr_saved, rc);
467
468         LASSERT(ergo(objects == 0, rc < 0));
469         RETURN(objects > 0 ? objects : rc);
470 }
471
472 /**
473  * Fix the OFD object ownership.
474  *
475  * If the object still has SUID+SGID bits set, meaning that it was precreated
476  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
477  * then we will accept the UID/GID/PROJID if sent by the client for initializing
478  * the ownership of this object.  We only allow this to happen once (so clear
479  * these bits) and later only allow setattr.
480  *
481  * \param[in] env        execution environment
482  * \param[in] fo         OFD object
483  * \param[in] la         object attributes
484  * \param[in] is_setattr was this function called from setattr or not
485  *
486  * \retval              0 if successful
487  * \retval              negative value on error
488  */
489 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
490                          struct lu_attr *la, int is_setattr)
491 {
492         struct ofd_thread_info  *info = ofd_info(env);
493         struct lu_attr          *ln = &info->fti_attr2;
494         __u32                    mask = 0;
495         int                      rc;
496
497         ENTRY;
498
499         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
500             !(la->la_valid & LA_PROJID))
501                 RETURN(0);
502
503         rc = dt_attr_get(env, ofd_object_child(fo), ln);
504         if (rc != 0)
505                 RETURN(rc);
506
507         LASSERT(ln->la_valid & LA_MODE);
508
509         /*
510          * Only allow setattr to change UID/GID/PROJID, if
511          * SUID+SGID is not set which means this is not
512          * initialization of this objects.
513          */
514         if (!is_setattr) {
515                 if (!(ln->la_mode & S_ISUID))
516                         la->la_valid &= ~LA_UID;
517                 if (!(ln->la_mode & S_ISGID))
518                         la->la_valid &= ~LA_GID;
519                 if (!(ln->la_mode & S_ISVTX))
520                         la->la_valid &= ~LA_PROJID;
521         }
522
523         /* Initialize ownership of this object, clear SUID+SGID bits*/
524         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
525                 mask |= S_ISUID;
526         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
527                 mask |= S_ISGID;
528         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
529                 mask |= S_ISVTX;
530         if (mask != 0) {
531                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
532                         la->la_mode = ln->la_mode;
533                         la->la_valid |= LA_MODE;
534                 }
535                 la->la_mode &= ~mask;
536         }
537
538         RETURN(0);
539 }
540
541 /**
542  * Check if it needs to update filter_fid by the value of @oa.
543  *
544  * \param[in] env       env
545  * \param[in] fo        ofd object
546  * \param[in] oa        obdo from client or MDT
547  * \param[out] ff       if filter_fid needs updating, this field is used to
548  *                      return the new buffer
549  *
550  * \retval < 0          error occurred
551  * \retval 0            doesn't need to update filter_fid
552  * \retval FL_XATTR_{CREATE,REPLACE}    flag for xattr update
553  */
554 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
555                          const struct obdo *oa, struct filter_fid *ff)
556 {
557         int rc = 0;
558         ENTRY;
559
560         if (!(oa->o_valid &
561               (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
562                 RETURN(0);
563
564         rc = ofd_object_ff_load(env, fo);
565         if (rc < 0 && rc != -ENODATA)
566                 RETURN(rc);
567
568         LASSERT(ff != &fo->ofo_ff);
569         if (rc == -ENODATA) {
570                 rc = LU_XATTR_CREATE;
571                 memset(ff, 0, sizeof(*ff));
572         } else {
573                 rc = LU_XATTR_REPLACE;
574                 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
575         }
576
577         if (oa->o_valid & OBD_MD_FLFID) {
578                 /* packing fid and converting it to LE for storing into EA.
579                  * Here ->o_stripe_idx should be filled by LOV and rest of
580                  * fields - by client. */
581                 ff->ff_parent.f_seq = oa->o_parent_seq;
582                 ff->ff_parent.f_oid = oa->o_parent_oid;
583                 /* XXX: we are ignoring o_parent_ver here, since this should
584                  *      be the same for all objects in this fileset. */
585                 ff->ff_parent.f_ver = oa->o_stripe_idx;
586         }
587         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
588                 ff->ff_layout = oa->o_layout;
589
590         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
591                 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
592                        PFID(&fo->ofo_ff.ff_parent),
593                        PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
594                        ff->ff_layout_version, oa->o_layout_version);
595
596                 /* only the MDS has the authority to update layout version */
597                 if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
598                       OBD_CONNECT_MDS)) {
599                         CERROR(DFID": update layout version from client\n",
600                                PFID(&fo->ofo_ff.ff_parent));
601
602                         RETURN(-EPERM);
603                 }
604
605                 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
606                         /* this opens a new era of writing */
607                         ff->ff_layout_version = 0;
608                         ff->ff_range = 0;
609                 }
610
611                 /* it's not allowed to change it to a smaller value */
612                 if (oa->o_layout_version < ff->ff_layout_version)
613                         RETURN(-EINVAL);
614
615                 if (ff->ff_layout_version == 0 ||
616                     oa->o_layout_version & LU_LAYOUT_RESYNC) {
617                         /* if LU_LAYOUT_RESYNC is set, it closes the era of
618                          * writing. Only mirror I/O can write this object. */
619                         ff->ff_layout_version = oa->o_layout_version;
620                         ff->ff_range = 0;
621                 } else if (oa->o_layout_version > ff->ff_layout_version) {
622                         ff->ff_range = max_t(__u32, ff->ff_range,
623                                              oa->o_layout_version -
624                                              ff->ff_layout_version);
625                 }
626         }
627
628         if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
629                 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
630         else /* no change */
631                 rc = 0;
632
633         RETURN(rc);
634 }
635
636 /**
637  * Set OFD object attributes.
638  *
639  * This function sets OFD object attributes taken from incoming request.
640  * It sets not only regular attributes but also XATTR_NAME_FID extended
641  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
642  * to be found and verified by LFSCK and other tools in case of inconsistency.
643  *
644  * \param[in] env       execution environment
645  * \param[in] fo        OFD object
646  * \param[in] la        object attributes
647  * \param[in] oa        obdo carries fid, ost_layout, layout version
648  *
649  * \retval              0 if successful
650  * \retval              negative value on error
651  */
652 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
653                  struct lu_attr *la, struct obdo *oa)
654 {
655         struct ofd_thread_info *info = ofd_info(env);
656         struct ofd_device *ofd = ofd_obj2dev(fo);
657         struct filter_fid *ff = &info->fti_mds_fid;
658         struct thandle *th;
659         int fl, rc, rc2;
660
661         ENTRY;
662
663         if (!ofd_object_exists(fo))
664                 GOTO(out, rc = -ENOENT);
665
666         /* VBR: version recovery check */
667         rc = ofd_version_get_check(info, fo);
668         if (rc)
669                 GOTO(out, rc);
670
671         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
672         if (rc != 0)
673                 GOTO(out, rc);
674
675         th = ofd_trans_create(env, ofd);
676         if (IS_ERR(th))
677                 GOTO(out, rc = PTR_ERR(th));
678
679         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
680         if (rc)
681                 GOTO(stop, rc);
682
683         info->fti_buf.lb_buf = ff;
684         info->fti_buf.lb_len = sizeof(*ff);
685         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
686                                   XATTR_NAME_FID, 0, th);
687         if (rc)
688                 GOTO(stop, rc);
689
690         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
691         if (rc)
692                 GOTO(stop, rc);
693
694         ofd_write_lock(env, fo);
695         if (!ofd_object_exists(fo))
696                 GOTO(unlock, rc = -ENOENT);
697
698         /* serialize vs ofd_commitrw_write() */
699         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
700                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
701                                info->fti_xid);
702
703         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
704         if (rc)
705                 GOTO(unlock, rc);
706
707         fl = ofd_object_ff_update(env, fo, oa, ff);
708         if (fl < 0)
709                 GOTO(unlock, rc = fl);
710
711         if (fl) {
712                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
713                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
714                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
715                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
716                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
717                         GOTO(unlock, rc);
718
719                 info->fti_buf.lb_buf = ff;
720                 info->fti_buf.lb_len = sizeof(*ff);
721                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
722                                   XATTR_NAME_FID, fl, th);
723                 if (!rc)
724                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
725         }
726
727         GOTO(unlock, rc);
728
729 unlock:
730         ofd_write_unlock(env, fo);
731 stop:
732         rc2 = ofd_trans_stop(env, ofd, th, rc);
733         if (rc2)
734                 CERROR("%s: failed to stop transaction: rc = %d\n",
735                        ofd_name(ofd), rc2);
736         if (!rc)
737                 rc = rc2;
738 out:
739         return rc;
740 }
741
742 /**
743  * Fallocate(Preallocate) space for OFD object.
744  *
745  * This function allocates space for the object from the \a start
746  * offset to the \a end offset.
747  *
748  * \param[in] env       execution environment
749  * \param[in] fo        OFD object
750  * \param[in] start     start offset to allocate from
751  * \param[in] end       end of allocate
752  * \param[in] mode      fallocate mode
753  * \param[in] la        object attributes
754  * \param[in] ff        filter_fid structure
755  *
756  * \retval              0 if successful
757  * \retval              negative value on error
758  */
759 int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
760                         __u64 start, __u64 end, int mode, struct lu_attr *la,
761                         struct obdo *oa)
762 {
763         struct ofd_thread_info *info = ofd_info(env);
764         struct ofd_device *ofd = ofd_obj2dev(fo);
765         struct dt_object *dob = ofd_object_child(fo);
766         struct thandle *th;
767         struct filter_fid *ff = &info->fti_mds_fid;
768         bool ff_needed = false;
769         int rc;
770
771         ENTRY;
772
773         ofd_write_lock(env, fo);
774         if (!ofd_object_exists(fo))
775                 GOTO(unlock, rc = -ENOENT);
776
777         /* VBR: version recovery check */
778         rc = ofd_version_get_check(info, fo);
779         if (rc != 0)
780                 GOTO(unlock, rc);
781
782         if (ff != NULL) {
783                 rc = ofd_object_ff_load(env, fo);
784                 if (rc == -ENODATA)
785                         ff_needed = true;
786                 else if (rc < 0)
787                         GOTO(unlock, rc);
788         }
789
790         th = ofd_trans_create(env, ofd);
791         if (IS_ERR(th))
792                 GOTO(unlock, rc = PTR_ERR(th));
793
794         rc = dt_declare_attr_set(env, dob, la, th);
795         if (rc)
796                 GOTO(stop, rc);
797
798         rc = dt_declare_falloc(env, dob, th);
799         if (rc)
800                 GOTO(stop, rc);
801
802         rc = ofd_trans_start(env, ofd, fo, th);
803         if (rc)
804                 GOTO(stop, rc);
805
806         rc = dt_falloc(env, dob, start, end, mode, th);
807         if (rc)
808                 GOTO(stop, rc);
809
810         rc = dt_attr_set(env, dob, la, th);
811         if (rc)
812                 GOTO(stop, rc);
813
814         if (ff_needed) {
815                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
816                                   XATTR_NAME_FID, 0, th);
817                 if (!rc)
818                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
819         }
820 stop:
821         ofd_trans_stop(env, ofd, th, rc);
822 unlock:
823         ofd_write_unlock(env, fo);
824         RETURN(rc);
825 }
826
827 /**
828  * Truncate/punch OFD object.
829  *
830  * This function frees all of the allocated object's space from the \a start
831  * offset to the \a end offset. For truncate() operations the \a end offset
832  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
833  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
834  *
835  * \param[in] env       execution environment
836  * \param[in] fo        OFD object
837  * \param[in] start     start offset to punch from
838  * \param[in] end       end of punch
839  * \param[in] la        object attributes
840  * \param[in] oa        obdo struct from incoming request
841  *
842  * \retval              0 if successful
843  * \retval              negative value on error
844  */
845 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
846                      __u64 start, __u64 end, struct lu_attr *la,
847                      struct obdo *oa)
848 {
849         struct ofd_thread_info *info = ofd_info(env);
850         struct ofd_device *ofd = ofd_obj2dev(fo);
851         struct dt_object *dob = ofd_object_child(fo);
852         struct filter_fid *ff = &info->fti_mds_fid;
853         struct thandle *th;
854         int fl, rc, rc2;
855
856         ENTRY;
857
858         /* we support truncate, not punch yet */
859         LASSERT(end == OBD_OBJECT_EOF);
860
861         if (!ofd_object_exists(fo))
862                 GOTO(out, rc = -ENOENT);
863
864         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
865                 rc = ofd_verify_ff(env, fo, oa);
866                 if (rc != 0)
867                         GOTO(out, rc);
868         }
869
870         /* VBR: version recovery check */
871         rc = ofd_version_get_check(info, fo);
872         if (rc)
873                 GOTO(out, rc);
874
875         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
876         if (rc != 0)
877                 GOTO(out, rc);
878
879         th = ofd_trans_create(env, ofd);
880         if (IS_ERR(th))
881                 GOTO(out, rc = PTR_ERR(th));
882
883         rc = dt_declare_attr_set(env, dob, la, th);
884         if (rc)
885                 GOTO(stop, rc);
886
887         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
888         if (rc)
889                 GOTO(stop, rc);
890
891         info->fti_buf.lb_buf = ff;
892         info->fti_buf.lb_len = sizeof(*ff);
893         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
894                                   XATTR_NAME_FID, 0, th);
895         if (rc)
896                 GOTO(stop, rc);
897
898         rc = ofd_trans_start(env, ofd, fo, th);
899         if (rc)
900                 GOTO(stop, rc);
901
902         ofd_write_lock(env, fo);
903
904         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
905                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
906                                info->fti_xid);
907
908         if (!ofd_object_exists(fo))
909                 GOTO(unlock, rc = -ENOENT);
910
911         /* need to verify layout version */
912         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
913                 rc = ofd_verify_layout_version(env, fo, oa);
914                 if (rc)
915                         GOTO(unlock, rc);
916
917                 oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
918         }
919
920         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
921                 /* punch must be aware we are dealing with an encrypted file */
922                 struct lu_attr la = {
923                         .la_valid = LA_FLAGS,
924                         .la_flags = LUSTRE_ENCRYPT_FL,
925                 };
926
927                 rc = dt_attr_set(env, dob, &la, th);
928                 if (rc)
929                         GOTO(unlock, rc);
930         }
931         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
932         if (rc)
933                 GOTO(unlock, rc);
934
935         fl = ofd_object_ff_update(env, fo, oa, ff);
936         if (fl < 0)
937                 GOTO(unlock, rc = fl);
938
939         rc = dt_attr_set(env, dob, la, th);
940         if (rc)
941                 GOTO(unlock, rc);
942
943         if (fl) {
944                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
945                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
946                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
947                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
948                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
949                         GOTO(unlock, rc);
950
951                 info->fti_buf.lb_buf = ff;
952                 info->fti_buf.lb_len = sizeof(*ff);
953                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
954                                   XATTR_NAME_FID, fl, th);
955                 if (!rc)
956                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
957         }
958
959         GOTO(unlock, rc);
960
961 unlock:
962         ofd_write_unlock(env, fo);
963 stop:
964         rc2 = ofd_trans_stop(env, ofd, th, rc);
965         if (rc2 != 0)
966                 CERROR("%s: failed to stop transaction: rc = %d\n",
967                        ofd_name(ofd), rc2);
968         if (!rc)
969                 rc = rc2;
970 out:
971         return rc;
972 }
973
974 /**
975  * Destroy OFD object.
976  *
977  * This function destroys OFD object. If object wasn't used at all (orphan)
978  * then local transaction is used, which means the transaction data is not
979  * returned back in reply.
980  *
981  * \param[in] env       execution environment
982  * \param[in] fo        OFD object
983  * \param[in] orphan    flag to indicate that object is orphaned
984  *
985  * \retval              0 if successful
986  * \retval              negative value on error
987  */
988 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
989                        int orphan)
990 {
991         struct ofd_device       *ofd = ofd_obj2dev(fo);
992         struct thandle          *th;
993         int                     rc = 0;
994         int                     rc2;
995
996         ENTRY;
997
998         if (!ofd_object_exists(fo))
999                 GOTO(out, rc = -ENOENT);
1000
1001         th = ofd_trans_create(env, ofd);
1002         if (IS_ERR(th))
1003                 GOTO(out, rc = PTR_ERR(th));
1004
1005         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
1006         if (rc < 0)
1007                 GOTO(stop, rc);
1008
1009         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
1010         if (rc < 0)
1011                 GOTO(stop, rc);
1012
1013         if (orphan)
1014                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
1015         else
1016                 rc = ofd_trans_start(env, ofd, NULL, th);
1017         if (rc)
1018                 GOTO(stop, rc);
1019
1020         ofd_write_lock(env, fo);
1021         if (!ofd_object_exists(fo))
1022                 GOTO(unlock, rc = -ENOENT);
1023
1024         tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
1025
1026         dt_ref_del(env, ofd_object_child(fo), th);
1027         dt_destroy(env, ofd_object_child(fo), th);
1028 unlock:
1029         ofd_write_unlock(env, fo);
1030 stop:
1031         rc2 = ofd_trans_stop(env, ofd, th, rc);
1032         if (rc2)
1033                 CERROR("%s failed to stop transaction: %d\n",
1034                        ofd_name(ofd), rc2);
1035         if (!rc)
1036                 rc = rc2;
1037 out:
1038         RETURN(rc);
1039 }
1040
1041 /**
1042  * Get OFD object attributes.
1043  *
1044  * This function gets OFD object regular attributes. It is used to serve
1045  * incoming request as well as for local OFD purposes.
1046  *
1047  * \param[in] env       execution environment
1048  * \param[in] fo        OFD object
1049  * \param[in] la        object attributes
1050  *
1051  * \retval              0 if successful
1052  * \retval              negative value on error
1053  */
1054 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
1055                  struct lu_attr *la)
1056 {
1057         int rc = 0;
1058
1059         ENTRY;
1060
1061         if (ofd_object_exists(fo)) {
1062                 rc = dt_attr_get(env, ofd_object_child(fo), la);
1063         } else {
1064                 rc = -ENOENT;
1065         }
1066         RETURN(rc);
1067 }