Whamcloud - gitweb
LU-6401 uapi: migrate remaining uapi headers to uapi directory
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_objects.c
33  *
34  * This file contains OSD API methods related to OBD Filter Device (OFD)
35  * object operations.
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include <dt_object.h>
44 #include <lustre_lfsck.h>
45
46 #include "ofd_internal.h"
47
48 /**
49  * Get object version from disk and check it.
50  *
51  * This function checks object version from disk with
52  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53  * VBR (Version-Based Recovery) and ensures that object has the same version
54  * upon replay as it has during original modification.
55  *
56  * \param[in]  info     execution thread OFD private data
57  * \param[in]  fo       OFD object
58  *
59  * \retval              0 if version matches
60  * \retval              -EOVERFLOW on version mismatch
61  */
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63                                  struct ofd_object *fo)
64 {
65         dt_obj_version_t curr_version;
66
67         LASSERT(ofd_object_exists(fo));
68
69         if (info->fti_exp == NULL)
70                 RETURN(0);
71
72         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
73         if ((__s64)curr_version == -EOPNOTSUPP)
74                 RETURN(0);
75         /* VBR: version is checked always because costs nothing */
76         if (info->fti_pre_version != 0 &&
77             info->fti_pre_version != curr_version) {
78                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
79                        info->fti_pre_version, curr_version);
80                 spin_lock(&info->fti_exp->exp_lock);
81                 info->fti_exp->exp_vbr_failed = 1;
82                 spin_unlock(&info->fti_exp->exp_lock);
83                 RETURN (-EOVERFLOW);
84         }
85         info->fti_pre_version = curr_version;
86         RETURN(0);
87 }
88
89 /**
90  * Get OFD object by FID.
91  *
92  * This function finds OFD slice of compound object with the given FID.
93  *
94  * \param[in] env       execution environment
95  * \param[in] ofd       OFD device
96  * \param[in] fid       FID of the object
97  *
98  * \retval              pointer to the found ofd_object
99  * \retval              ERR_PTR(errno) in case of error
100  */
101 struct ofd_object *ofd_object_find(const struct lu_env *env,
102                                    struct ofd_device *ofd,
103                                    const struct lu_fid *fid)
104 {
105         struct ofd_object *fo;
106         struct lu_object  *o;
107
108         ENTRY;
109
110         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
111         if (likely(!IS_ERR(o)))
112                 fo = ofd_obj(o);
113         else
114                 fo = ERR_CAST(o); /* return error */
115
116         RETURN(fo);
117 }
118
119 /**
120  * Get FID of parent MDT object.
121  *
122  * This function reads extended attribute XATTR_NAME_FID of OFD object which
123  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
124  *
125  * The filter_fid::ff_parent::f_ver field currently holds
126  * the OST-object index in the parent MDT-object's layout EA,
127  * not the actual FID::f_ver of the parent. We therefore access
128  * it via the macro f_stripe_idx.
129  *
130  * \param[in] env       execution environment
131  * \param[in] fo        OFD object
132  *
133  * \retval              0 if successful
134  * \retval              -ENODATA if there is no such xattr
135  * \retval              negative value on error
136  */
137 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
138 {
139         struct ofd_thread_info *info = ofd_info(env);
140         struct filter_fid *ff = &fo->ofo_ff;
141         struct lu_buf *buf = &info->fti_buf;
142         int rc = 0;
143
144         if (fid_is_sane(&ff->ff_parent))
145                 return 0;
146
147         buf->lb_buf = ff;
148         buf->lb_len = sizeof(*ff);
149         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
150         if (rc < 0)
151                 return rc;
152
153         if (unlikely(rc < sizeof(struct lu_fid))) {
154                 fid_zero(&ff->ff_parent);
155
156                 return -ENODATA;
157         }
158
159         filter_fid_le_to_cpu(ff, ff, rc);
160
161         return 0;
162 }
163
164 /**
165  * Precreate the given number \a nr of objects in the given sequence \a oseq.
166  *
167  * This function precreates new OST objects in the given sequence.
168  * The precreation starts from \a id and creates \a nr objects sequentially.
169  *
170  * Notes:
171  * This function may create fewer objects than requested.
172  *
173  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
174  * first write. Currently the permission bits on the OST are never used,
175  * so this is OK.
176  *
177  * Initialize a/c/m time so any client timestamp will always be newer and
178  * update the inode. The ctime = 0 case is also handled specially in
179  * osd_inode_setattr(). See LU-221, LU-1042 for details.
180  *
181  * \param[in] env       execution environment
182  * \param[in] ofd       OFD device
183  * \param[in] id        object ID to start precreation from
184  * \param[in] oseq      object sequence
185  * \param[in] nr        number of objects to precreate
186  * \param[in] sync      synchronous precreation flag
187  *
188  * \retval              0 if successful
189  * \retval              negative value on error
190  */
191 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
192                           u64 id, struct ofd_seq *oseq, int nr, int sync)
193 {
194         struct ofd_thread_info  *info = ofd_info(env);
195         struct ofd_object       *fo = NULL;
196         struct dt_object        *next;
197         struct thandle          *th;
198         struct ofd_object       **batch;
199         struct lu_fid           *fid = &info->fti_fid;
200         u64                     tmp;
201         int                     rc;
202         int                     rc2;
203         int                     i;
204         int                     objects = 0;
205         int                     nr_saved = nr;
206
207         ENTRY;
208
209         /* Don't create objects beyond the valid range for this SEQ */
210         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
211                      (id + nr) >= IDIF_MAX_OID)) {
212                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
213                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
214                 RETURN(rc = -ENOSPC);
215         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
216                             (id + nr) >= OBIF_MAX_OID)) {
217                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
218                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
219                 RETURN(rc = -ENOSPC);
220         }
221
222         OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
223         if (batch == NULL)
224                 RETURN(-ENOMEM);
225
226         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
227         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
228         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
229
230         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
231         info->fti_attr.la_atime = 0;
232         info->fti_attr.la_mtime = 0;
233         info->fti_attr.la_ctime = 0;
234
235         LASSERT(id != 0);
236
237         /* prepare objects */
238         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
239         for (i = 0; i < nr; i++) {
240                 rc = fid_set_id(fid, id + i);
241                 if (rc != 0) {
242                         if (i == 0)
243                                 GOTO(out, rc);
244
245                         nr = i;
246                         break;
247                 }
248
249                 fo = ofd_object_find(env, ofd, fid);
250                 if (IS_ERR(fo)) {
251                         if (i == 0)
252                                 GOTO(out, rc = PTR_ERR(fo));
253
254                         nr = i;
255                         break;
256                 }
257
258                 ofd_write_lock(env, fo);
259                 batch[i] = fo;
260         }
261         info->fti_buf.lb_buf = &tmp;
262         info->fti_buf.lb_len = sizeof(tmp);
263         info->fti_off = 0;
264
265         th = ofd_trans_create(env, ofd);
266         if (IS_ERR(th))
267                 GOTO(out, rc = PTR_ERR(th));
268
269         th->th_sync |= sync;
270
271         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
272                                      info->fti_off, th);
273         if (rc)
274                 GOTO(trans_stop, rc);
275
276         for (i = 0; i < nr; i++) {
277                 fo = batch[i];
278                 LASSERT(fo);
279
280                 if (unlikely(ofd_object_exists(fo))) {
281                         /* object may exist being re-created by write replay */
282                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
283                                DFID"\n", ostid_seq(&oseq->os_oi), id,
284                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
285                         continue;
286                 }
287
288                 next = ofd_object_child(fo);
289                 LASSERT(next != NULL);
290
291                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
292                                        &info->fti_dof, th);
293                 if (rc < 0) {
294                         if (i == 0)
295                                 GOTO(trans_stop, rc);
296
297                         nr = i;
298                         break;
299                 }
300         }
301
302         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
303         if (rc)
304                 GOTO(trans_stop, rc);
305
306         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
307                ofd_name(ofd), PFID(fid), nr);
308
309          /* When the LFSCK scanning the whole device to verify the LAST_ID file
310           * consistency, it will load the last_id into RAM firstly, and compare
311           * the last_id with each OST-object's ID. If the later one is larger,
312           * then it will regard the LAST_ID file crashed. But during the LFSCK
313           * scanning, the OFD may continue to create new OST-objects. Those new
314           * created OST-objects will have larger IDs than the LFSCK known ones.
315           * So from the LFSCK view, it needs to re-load the last_id from disk
316           * file, and if the latest last_id is still smaller than the object's
317           * ID, then the LAST_ID file is real crashed.
318           *
319           * To make above mechanism to work, before OFD pre-create OST-objects,
320           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
321           * may cannot get latest last_id although new OST-object created. */
322         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
323                 tmp = cpu_to_le64(id + nr - 1);
324                 dt_write_lock(env, oseq->os_lastid_obj, 0);
325                 rc = dt_record_write(env, oseq->os_lastid_obj,
326                                      &info->fti_buf, &info->fti_off, th);
327                 dt_write_unlock(env, oseq->os_lastid_obj);
328                 if (rc != 0)
329                         GOTO(trans_stop, rc);
330         }
331
332         for (i = 0; i < nr; i++) {
333                 fo = batch[i];
334                 LASSERT(fo);
335
336                 /* Only the new created objects need to be recorded. */
337                 if (ofd->ofd_osd->dd_record_fid_accessed) {
338                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
339
340                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
341                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
342                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
343                 }
344
345                 if (likely(!ofd_object_exists(fo) &&
346                            !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
347                         next = ofd_object_child(fo);
348                         LASSERT(next != NULL);
349
350                         rc = dt_create(env, next, &info->fti_attr, NULL,
351                                        &info->fti_dof, th);
352                         if (rc < 0) {
353                                 if (i == 0)
354                                         GOTO(trans_stop, rc);
355
356                                 rc = 0;
357                                 break;
358                         }
359                         LASSERT(ofd_object_exists(fo));
360                 }
361                 ofd_seq_last_oid_set(oseq, id + i);
362         }
363
364         objects = i;
365         /* NOT all the wanted objects have been created,
366          * set the LAST_ID as the real created. */
367         if (unlikely(objects < nr)) {
368                 int rc1;
369
370                 info->fti_off = 0;
371                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
372                 dt_write_lock(env, oseq->os_lastid_obj, 0);
373                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
374                                       &info->fti_buf, &info->fti_off, th);
375                 dt_write_unlock(env, oseq->os_lastid_obj);
376                 if (rc1 != 0)
377                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
378                                ") from %llu to %llu\n", ofd_name(ofd),
379                                ostid_seq(&oseq->os_oi), id + nr - 1,
380                                ofd_seq_last_oid(oseq));
381         }
382
383 trans_stop:
384         rc2 = ofd_trans_stop(env, ofd, th, rc);
385         if (rc2)
386                 CERROR("%s: failed to stop transaction: rc = %d\n",
387                        ofd_name(ofd), rc2);
388         if (!rc)
389                 rc = rc2;
390 out:
391         for (i = 0; i < nr_saved; i++) {
392                 fo = batch[i];
393                 if (fo) {
394                         ofd_write_unlock(env, fo);
395                         ofd_object_put(env, fo);
396                 }
397         }
398         OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
399
400         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
401                "created %d/%d objects: %d\n", objects, nr_saved, rc);
402
403         LASSERT(ergo(objects == 0, rc < 0));
404         RETURN(objects > 0 ? objects : rc);
405 }
406
407 /**
408  * Fix the OFD object ownership.
409  *
410  * If the object still has SUID+SGID bits set, meaning that it was precreated
411  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
412  * then we will accept the UID/GID/PROJID if sent by the client for initializing
413  * the ownership of this object.  We only allow this to happen once (so clear
414  * these bits) and later only allow setattr.
415  *
416  * \param[in] env        execution environment
417  * \param[in] fo         OFD object
418  * \param[in] la         object attributes
419  * \param[in] is_setattr was this function called from setattr or not
420  *
421  * \retval              0 if successful
422  * \retval              negative value on error
423  */
424 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
425                          struct lu_attr *la, int is_setattr)
426 {
427         struct ofd_thread_info  *info = ofd_info(env);
428         struct lu_attr          *ln = &info->fti_attr2;
429         __u32                    mask = 0;
430         int                      rc;
431
432         ENTRY;
433
434         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
435             !(la->la_valid & LA_PROJID))
436                 RETURN(0);
437
438         rc = dt_attr_get(env, ofd_object_child(fo), ln);
439         if (rc != 0)
440                 RETURN(rc);
441
442         LASSERT(ln->la_valid & LA_MODE);
443
444         /*
445          * Only allow setattr to change UID/GID/PROJID, if
446          * SUID+SGID is not set which means this is not
447          * initialization of this objects.
448          */
449         if (!is_setattr) {
450                 if (!(ln->la_mode & S_ISUID))
451                         la->la_valid &= ~LA_UID;
452                 if (!(ln->la_mode & S_ISGID))
453                         la->la_valid &= ~LA_GID;
454                 if (!(ln->la_mode & S_ISVTX))
455                         la->la_valid &= ~LA_PROJID;
456         }
457
458         /* Initialize ownership of this object, clear SUID+SGID bits*/
459         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
460                 mask |= S_ISUID;
461         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
462                 mask |= S_ISGID;
463         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
464                 mask |= S_ISVTX;
465         if (mask != 0) {
466                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
467                         la->la_mode = ln->la_mode;
468                         la->la_valid |= LA_MODE;
469                 }
470                 la->la_mode &= ~mask;
471         }
472
473         RETURN(0);
474 }
475
476 /**
477  * Set OFD object attributes.
478  *
479  * This function sets OFD object attributes taken from incoming request.
480  * It sets not only regular attributes but also XATTR_NAME_FID extended
481  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
482  * to be found and verified by LFSCK and other tools in case of inconsistency.
483  *
484  * \param[in] env       execution environment
485  * \param[in] fo        OFD object
486  * \param[in] la        object attributes
487  * \param[in] ff        filter_fid structure, contains additional attributes
488  *
489  * \retval              0 if successful
490  * \retval              negative value on error
491  */
492 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
493                  struct lu_attr *la, struct filter_fid *ff)
494 {
495         struct ofd_thread_info  *info = ofd_info(env);
496         struct ofd_device       *ofd = ofd_obj2dev(fo);
497         struct thandle          *th;
498         struct ofd_mod_data     *fmd;
499         int                     ff_needed = 0;
500         int                     rc;
501         int                     rc2;
502         ENTRY;
503
504         ofd_write_lock(env, fo);
505         if (!ofd_object_exists(fo))
506                 GOTO(unlock, rc = -ENOENT);
507
508         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
509                 fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
510                 if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
511                         fmd->fmd_mactime_xid = info->fti_xid;
512                 ofd_fmd_put(info->fti_exp, fmd);
513         }
514
515         /* VBR: version recovery check */
516         rc = ofd_version_get_check(info, fo);
517         if (rc)
518                 GOTO(unlock, rc);
519
520         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
521         if (rc != 0)
522                 GOTO(unlock, rc);
523
524         if (ff != NULL) {
525                 rc = ofd_object_ff_load(env, fo);
526                 if (rc == -ENODATA)
527                         ff_needed = 1;
528                 else if (rc < 0)
529                         GOTO(unlock, rc);
530         }
531
532         th = ofd_trans_create(env, ofd);
533         if (IS_ERR(th))
534                 GOTO(unlock, rc = PTR_ERR(th));
535
536         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
537         if (rc)
538                 GOTO(stop, rc);
539
540         if (ff_needed) {
541                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
542                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
543                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
544                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
545
546                 info->fti_buf.lb_buf = ff;
547                 info->fti_buf.lb_len = sizeof(*ff);
548                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
549                                           &info->fti_buf, XATTR_NAME_FID, 0,
550                                           th);
551                 if (rc)
552                         GOTO(stop, rc);
553         }
554
555         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
556         if (rc)
557                 GOTO(stop, rc);
558
559         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
560         if (rc)
561                 GOTO(stop, rc);
562
563         if (ff_needed) {
564                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
565                         GOTO(stop, rc);
566
567                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
568                                   XATTR_NAME_FID, 0, th);
569                 if (!rc)
570                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
571         }
572
573         GOTO(stop, rc);
574
575 stop:
576         rc2 = ofd_trans_stop(env, ofd, th, rc);
577         if (rc2)
578                 CERROR("%s: failed to stop transaction: rc = %d\n",
579                        ofd_name(ofd), rc2);
580         if (!rc)
581                 rc = rc2;
582
583 unlock:
584         ofd_write_unlock(env, fo);
585
586         return rc;
587 }
588
589 /**
590  * Truncate/punch OFD object.
591  *
592  * This function frees all of the allocated object's space from the \a start
593  * offset to the \a end offset. For truncate() operations the \a end offset
594  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
595  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
596  *
597  * \param[in] env       execution environment
598  * \param[in] fo        OFD object
599  * \param[in] start     start offset to punch from
600  * \param[in] end       end of punch
601  * \param[in] la        object attributes
602  * \param[in] ff        filter_fid structure
603  * \param[in] oa        obdo struct from incoming request
604  *
605  * \retval              0 if successful
606  * \retval              negative value on error
607  */
608 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
609                      __u64 start, __u64 end, struct lu_attr *la,
610                      struct filter_fid *ff, struct obdo *oa)
611 {
612         struct ofd_thread_info  *info = ofd_info(env);
613         struct ofd_device       *ofd = ofd_obj2dev(fo);
614         struct ofd_mod_data     *fmd;
615         struct dt_object        *dob = ofd_object_child(fo);
616         struct thandle          *th;
617         int                     ff_needed = 0;
618         int                     rc;
619         int                     rc2;
620
621         ENTRY;
622
623         /* we support truncate, not punch yet */
624         LASSERT(end == OBD_OBJECT_EOF);
625
626         ofd_write_lock(env, fo);
627         fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
628         if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
629                 fmd->fmd_mactime_xid = info->fti_xid;
630         ofd_fmd_put(info->fti_exp, fmd);
631
632         if (!ofd_object_exists(fo))
633                 GOTO(unlock, rc = -ENOENT);
634
635         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
636                 rc = ofd_verify_ff(env, fo, oa);
637                 if (rc != 0)
638                         GOTO(unlock, rc);
639         }
640
641         /* VBR: version recovery check */
642         rc = ofd_version_get_check(info, fo);
643         if (rc)
644                 GOTO(unlock, rc);
645
646         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
647         if (rc != 0)
648                 GOTO(unlock, rc);
649
650         if (ff != NULL) {
651                 rc = ofd_object_ff_load(env, fo);
652                 if (rc == -ENODATA)
653                         ff_needed = 1;
654                 else if (rc < 0)
655                         GOTO(unlock, rc);
656         }
657
658         th = ofd_trans_create(env, ofd);
659         if (IS_ERR(th))
660                 GOTO(unlock, rc = PTR_ERR(th));
661
662         rc = dt_declare_attr_set(env, dob, la, th);
663         if (rc)
664                 GOTO(stop, rc);
665
666         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
667         if (rc)
668                 GOTO(stop, rc);
669
670         if (ff_needed) {
671                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
672                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
673                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
674                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
675
676                 info->fti_buf.lb_buf = ff;
677                 info->fti_buf.lb_len = sizeof(*ff);
678                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
679                                           &info->fti_buf, XATTR_NAME_FID, 0,
680                                           th);
681                 if (rc)
682                         GOTO(stop, rc);
683         }
684
685         rc = ofd_trans_start(env, ofd, fo, th);
686         if (rc)
687                 GOTO(stop, rc);
688
689         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
690         if (rc)
691                 GOTO(stop, rc);
692
693         rc = dt_attr_set(env, dob, la, th);
694         if (rc)
695                 GOTO(stop, rc);
696
697         if (ff_needed) {
698                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
699                         GOTO(stop, rc);
700
701                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
702                                   XATTR_NAME_FID, 0, th);
703                 if (!rc)
704                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
705         }
706
707         GOTO(stop, rc);
708
709 stop:
710         rc2 = ofd_trans_stop(env, ofd, th, rc);
711         if (rc2 != 0)
712                 CERROR("%s: failed to stop transaction: rc = %d\n",
713                        ofd_name(ofd), rc2);
714         if (!rc)
715                 rc = rc2;
716 unlock:
717         ofd_write_unlock(env, fo);
718
719         return rc;
720 }
721
722 /**
723  * Destroy OFD object.
724  *
725  * This function destroys OFD object. If object wasn't used at all (orphan)
726  * then local transaction is used, which means the transaction data is not
727  * returned back in reply.
728  *
729  * \param[in] env       execution environment
730  * \param[in] fo        OFD object
731  * \param[in] orphan    flag to indicate that object is orphaned
732  *
733  * \retval              0 if successful
734  * \retval              negative value on error
735  */
736 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
737                        int orphan)
738 {
739         struct ofd_device       *ofd = ofd_obj2dev(fo);
740         struct thandle          *th;
741         int                     rc = 0;
742         int                     rc2;
743
744         ENTRY;
745
746         ofd_write_lock(env, fo);
747         if (!ofd_object_exists(fo))
748                 GOTO(unlock, rc = -ENOENT);
749
750         th = ofd_trans_create(env, ofd);
751         if (IS_ERR(th))
752                 GOTO(unlock, rc = PTR_ERR(th));
753
754         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
755         if (rc < 0)
756                 GOTO(stop, rc);
757
758         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
759         if (rc < 0)
760                 GOTO(stop, rc);
761
762         if (orphan)
763                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
764         else
765                 rc = ofd_trans_start(env, ofd, NULL, th);
766         if (rc)
767                 GOTO(stop, rc);
768
769         ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
770
771         dt_ref_del(env, ofd_object_child(fo), th);
772         dt_destroy(env, ofd_object_child(fo), th);
773 stop:
774         rc2 = ofd_trans_stop(env, ofd, th, rc);
775         if (rc2)
776                 CERROR("%s failed to stop transaction: %d\n",
777                        ofd_name(ofd), rc2);
778         if (!rc)
779                 rc = rc2;
780 unlock:
781         ofd_write_unlock(env, fo);
782         RETURN(rc);
783 }
784
785 /**
786  * Get OFD object attributes.
787  *
788  * This function gets OFD object regular attributes. It is used to serve
789  * incoming request as well as for local OFD purposes.
790  *
791  * \param[in] env       execution environment
792  * \param[in] fo        OFD object
793  * \param[in] la        object attributes
794  *
795  * \retval              0 if successful
796  * \retval              negative value on error
797  */
798 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
799                  struct lu_attr *la)
800 {
801         int rc = 0;
802
803         ENTRY;
804
805         if (ofd_object_exists(fo)) {
806                 rc = dt_attr_get(env, ofd_object_child(fo), la);
807         } else {
808                 rc = -ENOENT;
809         }
810         RETURN(rc);
811 }