Whamcloud - gitweb
LU-17662 osd-zfs: Support for ZFS 2.2.3
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ofd/ofd_objects.c
32  *
33  * This file contains OSD API methods related to OBD Filter Device (OFD)
34  * object operations.
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_FILTER
41
42 #include <dt_object.h>
43 #include <lustre_lfsck.h>
44 #include <lustre_export.h>
45
46 #include "ofd_internal.h"
47
48 /**
49  * Get object version from disk and check it.
50  *
51  * This function checks object version from disk with
52  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53  * VBR (Version-Based Recovery) and ensures that object has the same version
54  * upon replay as it has during original modification.
55  *
56  * \param[in]  info     execution thread OFD private data
57  * \param[in]  fo       OFD object
58  *
59  * \retval              0 if version matches
60  * \retval              -EOVERFLOW on version mismatch
61  */
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63                                  struct ofd_object *fo)
64 {
65         dt_obj_version_t curr_version;
66
67         if (info->fti_exp == NULL)
68                 RETURN(0);
69
70         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
71         if ((__s64)curr_version == -EOPNOTSUPP)
72                 RETURN(0);
73         /* VBR: version is checked always because costs nothing */
74         if (info->fti_pre_version != 0 &&
75             info->fti_pre_version != curr_version) {
76                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
77                        info->fti_pre_version, curr_version);
78                 spin_lock(&info->fti_exp->exp_lock);
79                 info->fti_exp->exp_vbr_failed = 1;
80                 spin_unlock(&info->fti_exp->exp_lock);
81                 RETURN (-EOVERFLOW);
82         }
83         info->fti_pre_version = curr_version;
84         RETURN(0);
85 }
86
87 /**
88  * Get OFD object by FID.
89  *
90  * This function finds OFD slice of compound object with the given FID.
91  *
92  * \param[in] env       execution environment
93  * \param[in] ofd       OFD device
94  * \param[in] fid       FID of the object
95  *
96  * \retval              pointer to the found ofd_object
97  * \retval              ERR_PTR(errno) in case of error
98  */
99 struct ofd_object *ofd_object_find(const struct lu_env *env,
100                                    struct ofd_device *ofd,
101                                    const struct lu_fid *fid)
102 {
103         struct ofd_object *fo;
104         struct lu_object  *o;
105
106         ENTRY;
107
108         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
109         if (likely(!IS_ERR(o)))
110                 fo = ofd_obj(o);
111         else
112                 fo = ERR_CAST(o); /* return error */
113
114         RETURN(fo);
115 }
116
117 /**
118  * Get FID of parent MDT object.
119  *
120  * This function reads extended attribute XATTR_NAME_FID of OFD object which
121  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
122  *
123  * The filter_fid::ff_parent::f_ver field currently holds
124  * the OST-object index in the parent MDT-object's layout EA,
125  * not the actual FID::f_ver of the parent. We therefore access
126  * it via the macro f_stripe_idx.
127  *
128  * \param[in] env       execution environment
129  * \param[in] fo        OFD object
130  *
131  * \retval              0 if successful
132  * \retval              -ENODATA if there is no such xattr
133  * \retval              negative value on error
134  */
135 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
136 {
137         struct ofd_thread_info *info = ofd_info(env);
138         struct filter_fid *ff = &fo->ofo_ff;
139         struct lu_buf *buf = &info->fti_buf;
140         int rc = 0;
141
142         if (fid_is_sane(&ff->ff_parent))
143                 return 0;
144
145         buf->lb_buf = ff;
146         buf->lb_len = sizeof(*ff);
147         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
148         if (rc == -ERANGE) {
149                 struct filter_fid *ff_new;
150
151                 OBD_ALLOC(ff_new, sizeof(*ff) + FILTER_FID_EXTRA_SIZE);
152                 if (!ff_new)
153                         return -ENOMEM;
154                 buf->lb_buf = ff_new;
155                 buf->lb_len = sizeof(*ff) + FILTER_FID_EXTRA_SIZE;
156                 rc = dt_xattr_get(env, ofd_object_child(fo), buf,
157                                   XATTR_NAME_FID);
158                 if (rc > 0)
159                         memcpy(ff, ff_new, sizeof(*ff));
160                 OBD_FREE(ff_new, sizeof(*ff) + FILTER_FID_EXTRA_SIZE);
161         }
162         if (rc < 0)
163                 return rc;
164
165         if (unlikely(rc < sizeof(struct lu_fid))) {
166                 fid_zero(&ff->ff_parent);
167                 return -EINVAL;
168         }
169
170         filter_fid_le_to_cpu(ff, ff, rc);
171
172         return 0;
173 }
174
175 struct ofd_precreate_cb {
176         struct dt_txn_commit_cb  opc_cb;
177         struct ofd_seq          *opc_oseq;
178         int                      opc_objects;
179 };
180
181 static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
182                              struct dt_txn_commit_cb *cb, int err)
183 {
184         struct ofd_precreate_cb *opc;
185         struct ofd_seq *oseq;
186
187         opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
188         oseq = opc->opc_oseq;
189
190         CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
191                opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
192                PFID(&oseq->os_oi.oi_fid), th->th_sync);
193         atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
194         ofd_seq_put(env, opc->opc_oseq);
195         OBD_FREE_PTR(opc);
196 }
197
198 static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
199                                 struct ofd_seq *oseq, int objects)
200 {
201         struct ofd_precreate_cb *opc;
202         struct dt_txn_commit_cb *dcb;
203         int precreate, rc;
204
205         OBD_ALLOC_PTR(opc);
206         if (!opc)
207                 return -ENOMEM;
208
209         precreate = atomic_read(&oseq->os_precreate_in_progress);
210         refcount_inc(&oseq->os_refc);
211         opc->opc_oseq = oseq;
212         opc->opc_objects = objects;
213         CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
214                opc->opc_objects, precreate,
215                PFID(&oseq->os_oi.oi_fid), th->th_sync);
216
217         if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
218                 th->th_sync = 1;
219
220         dcb = &opc->opc_cb;
221         dcb->dcb_func = ofd_cb_precreate;
222         INIT_LIST_HEAD(&dcb->dcb_linkage);
223         strscpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
224
225         rc = dt_trans_cb_add(th, dcb);
226         if (rc) {
227                 ofd_seq_put(env, oseq);
228                 OBD_FREE_PTR(opc);
229                 return rc;
230         }
231
232         atomic_add(objects, &oseq->os_precreate_in_progress);
233
234         return 0;
235 }
236
237 /**
238  * Precreate the given number \a nr of objects in the given sequence \a oseq.
239  *
240  * This function precreates new OST objects in the given sequence.
241  * The precreation starts from \a id and creates \a nr objects sequentially.
242  *
243  * Notes:
244  * This function may create fewer objects than requested.
245  *
246  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
247  * first write. Currently the permission bits on the OST are never used,
248  * so this is OK.
249  *
250  * Initialize a/c/m time so any client timestamp will always be newer and
251  * update the inode. The ctime = 0 case is also handled specially in
252  * osd_inode_setattr(). See LU-221, LU-1042 for details.
253  *
254  * \param[in] env               execution environment
255  * \param[in] ofd               OFD device
256  * \param[in] id                object ID to start precreation from
257  * \param[in] oseq              object sequence
258  * \param[in] nr                number of objects to precreate
259  * \param[in] sync              synchronous precreation flag
260  * \param[in] trans_local       start local transaction
261  *
262  * \retval              0 if successful
263  * \retval              negative value on error
264  */
265 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
266                           u64 id, struct ofd_seq *oseq, int nr, int sync,
267                           bool trans_local)
268 {
269         struct ofd_thread_info  *info = ofd_info(env);
270         struct ofd_object       *fo = NULL;
271         struct dt_object        *next;
272         struct thandle          *th;
273         struct ofd_object       **batch;
274         struct lu_fid           *fid = &info->fti_fid;
275         u64                     tmp;
276         int                     rc;
277         int                     rc2;
278         int                     i;
279         int                     objects = 0;
280         int                     nr_saved = nr;
281
282         ENTRY;
283
284         /* Don't create objects beyond the valid range for this SEQ
285          * Last object to create is (id + nr - 1), but we move -1 on LHS
286          * to +1 on RHS to evaluate constant at compile time. LU-11186
287          */
288         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
289                      id + nr > IDIF_MAX_OID + 1)) {
290                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
291                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
292                 RETURN(rc = -ENOSPC);
293         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
294                             id + nr > OBIF_MAX_OID + 1)) {
295                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
296                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
297                 RETURN(rc = -ENOSPC);
298         }
299
300         OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
301         if (batch == NULL)
302                 RETURN(-ENOMEM);
303
304         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
305         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
306         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
307
308         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
309         info->fti_attr.la_atime = 0;
310         info->fti_attr.la_mtime = 0;
311         info->fti_attr.la_ctime = 0;
312
313         LASSERT(id != 0);
314
315         /* prepare objects */
316         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
317         for (i = 0; i < nr; i++) {
318                 rc = fid_set_id(fid, id + i);
319                 if (rc != 0) {
320                         if (i == 0)
321                                 GOTO(out, rc);
322
323                         nr = i;
324                         break;
325                 }
326
327                 fo = ofd_object_find(env, ofd, fid);
328                 if (IS_ERR(fo)) {
329                         if (i == 0)
330                                 GOTO(out, rc = PTR_ERR(fo));
331
332                         nr = i;
333                         break;
334                 }
335
336                 batch[i] = fo;
337         }
338         info->fti_buf.lb_buf = &tmp;
339         info->fti_buf.lb_len = sizeof(tmp);
340         info->fti_off = 0;
341
342         th = ofd_trans_create(env, ofd);
343         if (IS_ERR(th))
344                 GOTO(out, rc = PTR_ERR(th));
345
346         th->th_sync |= sync;
347
348         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
349                                      info->fti_off, th);
350         if (rc)
351                 GOTO(trans_stop, rc);
352
353         for (i = 0; i < nr; i++) {
354                 fo = batch[i];
355                 LASSERT(fo);
356
357                 if (unlikely(ofd_object_exists(fo))) {
358                         /* object may exist being re-created by write replay */
359                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
360                                DFID"\n", ostid_seq(&oseq->os_oi), id,
361                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
362                         continue;
363                 }
364
365                 next = ofd_object_child(fo);
366                 LASSERT(next != NULL);
367
368                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
369                                        &info->fti_dof, th);
370                 if (rc < 0) {
371                         if (i == 0)
372                                 GOTO(trans_stop, rc);
373
374                         nr = i;
375                         break;
376                 }
377         }
378
379         /* Only needed for MDS+OSS rolling upgrade interop with 2.16+older. */
380         if (unlikely(trans_local))
381                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
382         else
383                 rc = dt_trans_start(env, ofd->ofd_osd, th);
384         if (rc)
385                 GOTO(trans_stop, rc);
386
387         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
388                ofd_name(ofd), PFID(fid), nr);
389
390          /* When the LFSCK scanning the whole device to verify the LAST_ID file
391           * consistency, it will load the last_id into RAM firstly, and compare
392           * the last_id with each OST-object's ID. If the later one is larger,
393           * then it will regard the LAST_ID file crashed. But during the LFSCK
394           * scanning, the OFD may continue to create new OST-objects. Those new
395           * created OST-objects will have larger IDs than the LFSCK known ones.
396           * So from the LFSCK view, it needs to re-load the last_id from disk
397           * file, and if the latest last_id is still smaller than the object's
398           * ID, then the LAST_ID file is real crashed.
399           *
400           * To make above mechanism to work, before OFD pre-create OST-objects,
401           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
402           * may cannot get latest last_id although new OST-object created. */
403         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
404                 tmp = cpu_to_le64(id + nr - 1);
405                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
406                 rc = dt_record_write(env, oseq->os_lastid_obj,
407                                      &info->fti_buf, &info->fti_off, th);
408                 dt_write_unlock(env, oseq->os_lastid_obj);
409                 if (rc != 0)
410                         GOTO(trans_stop, rc);
411         }
412
413         for (i = 0; i < nr; i++) {
414                 fo = batch[i];
415                 LASSERT(fo);
416
417                 ofd_write_lock(env, fo);
418
419                 /* Only the new created objects need to be recorded. */
420                 if (ofd->ofd_osd->dd_record_fid_accessed) {
421                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
422
423                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
424                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
425                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
426                 }
427
428                 if (likely(!ofd_object_exists(fo) &&
429                            !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
430                         next = ofd_object_child(fo);
431                         LASSERT(next != NULL);
432
433                         rc = dt_create(env, next, &info->fti_attr, NULL,
434                                        &info->fti_dof, th);
435                         ofd_write_unlock(env, fo);
436                         if (rc < 0) {
437                                 if (i == 0)
438                                         GOTO(trans_stop, rc);
439
440                                 rc = 0;
441                                 break;
442                         }
443                         LASSERT(ofd_object_exists(fo));
444                 } else {
445                         ofd_write_unlock(env, fo);
446                 }
447
448                 ofd_seq_last_oid_set(oseq, id + i);
449         }
450
451         objects = i;
452         /* NOT all the wanted objects have been created,
453          * set the LAST_ID as the real created. */
454         if (unlikely(objects < nr)) {
455                 int rc1;
456
457                 info->fti_off = 0;
458                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
459                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
460                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
461                                       &info->fti_buf, &info->fti_off, th);
462                 dt_write_unlock(env, oseq->os_lastid_obj);
463                 if (rc1 != 0)
464                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
465                                ") from %llu to %llu\n", ofd_name(ofd),
466                                ostid_seq(&oseq->os_oi), id + nr - 1,
467                                ofd_seq_last_oid(oseq));
468         }
469
470         if (objects)
471                 ofd_precreate_cb_add(env, th, oseq, objects);
472 trans_stop:
473         rc2 = ofd_trans_stop(env, ofd, th, rc);
474         if (rc2)
475                 CERROR("%s: failed to stop transaction: rc = %d\n",
476                        ofd_name(ofd), rc2);
477         if (!rc)
478                 rc = rc2;
479 out:
480         for (i = 0; i < nr_saved; i++) {
481                 fo = batch[i];
482                 if (!fo)
483                         continue;
484                 ofd_object_put(env, fo);
485         }
486         OBD_FREE_PTR_ARRAY(batch, nr_saved);
487
488         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
489                "created %d/%d objects: %d\n", objects, nr_saved, rc);
490
491         LASSERT(ergo(objects == 0, rc < 0));
492         RETURN(objects > 0 ? objects : rc);
493 }
494
495 /**
496  * Fix the OFD object ownership.
497  *
498  * If the object still has SUID+SGID bits set, meaning that it was precreated
499  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
500  * then we will accept the UID/GID/PROJID if sent by the client for initializing
501  * the ownership of this object.  We only allow this to happen once (so clear
502  * these bits) and later only allow setattr.
503  *
504  * \param[in] env        execution environment
505  * \param[in] fo         OFD object
506  * \param[in] la         object attributes
507  * \param[in] is_setattr was this function called from setattr or not
508  *
509  * \retval              0 if successful
510  * \retval              negative value on error
511  */
512 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
513                          struct lu_attr *la, int is_setattr)
514 {
515         struct ofd_thread_info  *info = ofd_info(env);
516         struct lu_attr          *ln = &info->fti_attr2;
517         __u32                    mask = 0;
518         int                      rc;
519
520         ENTRY;
521
522         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
523             !(la->la_valid & LA_PROJID))
524                 RETURN(0);
525
526         rc = dt_attr_get(env, ofd_object_child(fo), ln);
527         if (rc != 0)
528                 RETURN(rc);
529
530         LASSERT(ln->la_valid & LA_MODE);
531
532         /*
533          * Only allow setattr to change UID/GID/PROJID, if
534          * SUID+SGID is not set which means this is not
535          * initialization of this objects.
536          */
537         if (!is_setattr) {
538                 if (!(ln->la_mode & S_ISUID))
539                         la->la_valid &= ~LA_UID;
540                 if (!(ln->la_mode & S_ISGID))
541                         la->la_valid &= ~LA_GID;
542                 if (!(ln->la_mode & S_ISVTX))
543                         la->la_valid &= ~LA_PROJID;
544         }
545
546         /* Initialize ownership of this object, clear SUID+SGID bits*/
547         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
548                 mask |= S_ISUID;
549         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
550                 mask |= S_ISGID;
551         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
552                 mask |= S_ISVTX;
553         if (mask != 0) {
554                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
555                         la->la_mode = ln->la_mode;
556                         la->la_valid |= LA_MODE;
557                 }
558                 la->la_mode &= ~mask;
559         }
560
561         RETURN(0);
562 }
563
564 /**
565  * Check if it needs to update filter_fid by the value of @oa.
566  *
567  * \param[in] env       env
568  * \param[in] fo        ofd object
569  * \param[in] oa        obdo from client or MDT
570  * \param[out] ff       if filter_fid needs updating, this field is used to
571  *                      return the new buffer
572  *
573  * \retval < 0          error occurred
574  * \retval 0            doesn't need to update filter_fid
575  * \retval FL_XATTR_{CREATE,REPLACE}    flag for xattr update
576  */
577 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
578                          const struct obdo *oa, struct filter_fid *ff)
579 {
580         int rc = 0;
581         ENTRY;
582
583         if (!(oa->o_valid &
584               (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
585                 RETURN(0);
586
587         rc = ofd_object_ff_load(env, fo);
588         if (rc < 0 && rc != -ENODATA)
589                 RETURN(rc);
590
591         LASSERT(ff != &fo->ofo_ff);
592         if (rc == -ENODATA) {
593                 rc = LU_XATTR_CREATE;
594                 memset(ff, 0, sizeof(*ff));
595         } else {
596                 rc = LU_XATTR_REPLACE;
597                 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
598         }
599
600         if (oa->o_valid & OBD_MD_FLFID) {
601                 /* packing fid and converting it to LE for storing into EA.
602                  * Here ->o_stripe_idx should be filled by LOV and rest of
603                  * fields - by client. */
604                 ff->ff_parent.f_seq = oa->o_parent_seq;
605                 ff->ff_parent.f_oid = oa->o_parent_oid;
606                 /* XXX: we are ignoring o_parent_ver here, since this should
607                  *      be the same for all objects in this fileset. */
608                 ff->ff_parent.f_ver = oa->o_stripe_idx;
609         }
610         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
611                 ff->ff_layout = oa->o_layout;
612
613         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
614                 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
615                        PFID(&fo->ofo_ff.ff_parent),
616                        PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
617                        ff->ff_layout_version, oa->o_layout_version);
618
619                 /**
620                  * resync write from client on non-primary objects and
621                  * resync start from MDS on primary objects will contain
622                  * LU_LAYOUT_RESYNC flag in the @oa.
623                  *
624                  * The layout version checking for write/punch from client
625                  * happens in ofd_verify_layout_version() before coming to
626                  * here, so that resync with smaller layout version client
627                  * will be rejected there, the biggest resync version will
628                  * be recorded in the OFD objects.
629                  */
630                 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
631                         /* this opens a new era of writing */
632                         ff->ff_layout_version = 0;
633                         ff->ff_range = 0;
634                 }
635
636                 /* it's not allowed to change it to a smaller value */
637                 if (ofd_layout_version_less(oa->o_layout_version,
638                                             ff->ff_layout_version))
639                         RETURN(-EINVAL);
640
641                 if (ff->ff_layout_version == 0 ||
642                     oa->o_layout_version & LU_LAYOUT_RESYNC) {
643                         /* if LU_LAYOUT_RESYNC is set, it closes the era of
644                          * writing. Only mirror I/O can write this object. */
645                         ff->ff_layout_version = oa->o_layout_version;
646                         ff->ff_range = 0;
647                 } else if (oa->o_layout_version > ff->ff_layout_version) {
648                         ff->ff_range = max_t(__u32, ff->ff_range,
649                                              oa->o_layout_version -
650                                              ff->ff_layout_version);
651                 }
652         }
653
654         if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
655                 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
656         else /* no change */
657                 rc = 0;
658
659         RETURN(rc);
660 }
661
662 /**
663  * Set OFD object attributes.
664  *
665  * This function sets OFD object attributes taken from incoming request.
666  * It sets not only regular attributes but also XATTR_NAME_FID extended
667  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
668  * to be found and verified by LFSCK and other tools in case of inconsistency.
669  *
670  * \param[in] env       execution environment
671  * \param[in] fo        OFD object
672  * \param[in] la        object attributes
673  * \param[in] oa        obdo carries fid, ost_layout, layout version
674  *
675  * \retval              0 if successful
676  * \retval              negative value on error
677  */
678 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
679                  struct lu_attr *la, struct obdo *oa)
680 {
681         struct ofd_thread_info *info = ofd_info(env);
682         struct ofd_device *ofd = ofd_obj2dev(fo);
683         struct filter_fid *ff = &info->fti_mds_fid;
684         struct thandle *th;
685         int fl, rc, rc2;
686
687         ENTRY;
688
689         if (!ofd_object_exists(fo))
690                 GOTO(out, rc = -ENOENT);
691
692         /* VBR: version recovery check */
693         rc = ofd_version_get_check(info, fo);
694         if (rc)
695                 GOTO(out, rc);
696
697         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
698         if (rc != 0)
699                 GOTO(out, rc);
700
701         th = ofd_trans_create(env, ofd);
702         if (IS_ERR(th))
703                 GOTO(out, rc = PTR_ERR(th));
704
705         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
706         if (rc)
707                 GOTO(stop, rc);
708
709         info->fti_buf.lb_buf = ff;
710         info->fti_buf.lb_len = sizeof(*ff);
711         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
712                                   XATTR_NAME_FID, 0, th);
713         if (rc)
714                 GOTO(stop, rc);
715
716         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
717         if (rc)
718                 GOTO(stop, rc);
719
720         ofd_write_lock(env, fo);
721         if (!ofd_object_exists(fo))
722                 GOTO(unlock, rc = -ENOENT);
723
724         /* serialize vs ofd_commitrw_write() */
725         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
726                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
727                                info->fti_xid);
728
729         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
730         if (rc)
731                 GOTO(unlock, rc);
732
733         fl = ofd_object_ff_update(env, fo, oa, ff);
734         if (fl < 0)
735                 GOTO(unlock, rc = fl);
736
737         if (fl) {
738                 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
739                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
740                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
741                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
742                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
743                         GOTO(unlock, rc);
744
745                 info->fti_buf.lb_buf = ff;
746                 info->fti_buf.lb_len = sizeof(*ff);
747                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
748                                   XATTR_NAME_FID, fl, th);
749                 if (!rc)
750                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
751         }
752
753         GOTO(unlock, rc);
754
755 unlock:
756         ofd_write_unlock(env, fo);
757 stop:
758         rc2 = ofd_trans_stop(env, ofd, th, rc);
759         if (rc2)
760                 CERROR("%s: failed to stop transaction: rc = %d\n",
761                        ofd_name(ofd), rc2);
762         if (!rc)
763                 rc = rc2;
764 out:
765         return rc;
766 }
767
768 /**
769  * Fallocate(Preallocate) space for OFD object.
770  *
771  * This function allocates space for the object from the \a start
772  * offset to the \a end offset.
773  *
774  * \param[in] env       execution environment
775  * \param[in] fo        OFD object
776  * \param[in] start     start offset to allocate from
777  * \param[in] end       end of allocate
778  * \param[in] mode      fallocate mode
779  * \param[in] la        object attributes
780  * \param[in] ff        filter_fid structure
781  *
782  * \retval              0 if successful
783  * \retval              negative value on error
784  */
785 int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
786                          __u64 start, __u64 end, int mode, struct lu_attr *la,
787                          struct obdo *oa)
788 {
789         struct ofd_thread_info *info = ofd_info(env);
790         struct ofd_device *ofd = ofd_obj2dev(fo);
791         struct dt_object *dob = ofd_object_child(fo);
792         struct thandle *th;
793         struct filter_fid *ff = &info->fti_mds_fid;
794         bool ff_needed = false;
795         int rc;
796
797         ENTRY;
798
799         if (!ofd_object_exists(fo))
800                 RETURN(-ENOENT);
801
802         /* VBR: version recovery check */
803         rc = ofd_version_get_check(info, fo);
804         if (rc != 0)
805                 RETURN(rc);
806
807         if (ff != NULL) {
808                 rc = ofd_object_ff_load(env, fo);
809                 if (rc == -ENODATA)
810                         ff_needed = true;
811                 else if (rc < 0)
812                         RETURN(rc);
813
814                 if (ff_needed) {
815                         if (oa->o_valid & OBD_MD_FLFID) {
816                                 ff->ff_parent.f_seq = oa->o_parent_seq;
817                                 ff->ff_parent.f_oid = oa->o_parent_oid;
818                                 ff->ff_parent.f_ver = oa->o_stripe_idx;
819                         }
820                         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
821                                 ff->ff_layout = oa->o_layout;
822                         if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
823                                 ff->ff_layout_version = oa->o_layout_version;
824                         filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
825                 }
826         }
827
828         th = ofd_trans_create(env, ofd);
829         if (IS_ERR(th))
830                 RETURN(PTR_ERR(th));
831
832         rc = dt_declare_attr_set(env, dob, la, th);
833         if (rc)
834                 GOTO(stop, rc);
835
836         rc = dt_declare_fallocate(env, dob, start, end, mode, th);
837         if (rc)
838                 GOTO(stop, rc);
839
840         if (ff_needed) {
841                 info->fti_buf.lb_buf = ff;
842                 info->fti_buf.lb_len = sizeof(*ff);
843                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
844                                           &info->fti_buf, XATTR_NAME_FID, 0,
845                                           th);
846                 if (rc)
847                         GOTO(stop, rc);
848         }
849
850         rc = ofd_trans_start(env, ofd, fo, th);
851         if (rc)
852                 GOTO(stop, rc);
853
854         ofd_read_lock(env, fo);
855         if (!ofd_object_exists(fo))
856                 GOTO(unlock, rc = -ENOENT);
857
858         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
859                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
860                                info->fti_xid);
861
862         rc = dt_falloc(env, dob, start, end, mode, th);
863         if (rc)
864                 GOTO(unlock, rc);
865
866         rc = dt_attr_set(env, dob, la, th);
867         if (rc)
868                 GOTO(unlock, rc);
869
870         if (ff_needed) {
871                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
872                                   XATTR_NAME_FID, 0, th);
873                 if (!rc)
874                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
875         }
876 unlock:
877         ofd_read_unlock(env, fo);
878 stop:
879         ofd_trans_stop(env, ofd, th, rc);
880         RETURN(rc);
881 }
882
883 /**
884  * Truncate/punch OFD object.
885  *
886  * This function frees all of the allocated object's space from the \a start
887  * offset to the \a end offset. For truncate() operations the \a end offset
888  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
889  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
890  *
891  * \param[in] env       execution environment
892  * \param[in] fo        OFD object
893  * \param[in] start     start offset to punch from
894  * \param[in] end       end of punch
895  * \param[in] la        object attributes
896  * \param[in] oa        obdo struct from incoming request
897  *
898  * \retval              0 if successful
899  * \retval              negative value on error
900  */
901 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
902                      __u64 start, __u64 end, struct lu_attr *la,
903                      struct obdo *oa)
904 {
905         struct ofd_thread_info *info = ofd_info(env);
906         struct ofd_device *ofd = ofd_obj2dev(fo);
907         struct dt_object *dob = ofd_object_child(fo);
908         struct filter_fid *ff = &info->fti_mds_fid;
909         struct thandle *th;
910         int fl, rc, rc2;
911
912         ENTRY;
913
914         /* we support truncate, not punch yet */
915         LASSERT(end == OBD_OBJECT_EOF);
916
917         if (!ofd_object_exists(fo))
918                 GOTO(out, rc = -ENOENT);
919
920         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
921                 rc = ofd_verify_ff(env, fo, oa);
922                 if (rc != 0)
923                         GOTO(out, rc);
924         }
925
926         /* VBR: version recovery check */
927         rc = ofd_version_get_check(info, fo);
928         if (rc)
929                 GOTO(out, rc);
930
931         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
932         if (rc != 0)
933                 GOTO(out, rc);
934
935         th = ofd_trans_create(env, ofd);
936         if (IS_ERR(th))
937                 GOTO(out, rc = PTR_ERR(th));
938
939         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
940                 /* punch must be aware we are dealing with an encrypted file */
941                 la->la_valid |= LA_FLAGS;
942                 la->la_flags |= LUSTRE_ENCRYPT_FL;
943         }
944         rc = dt_declare_attr_set(env, dob, la, th);
945         if (rc)
946                 GOTO(stop, rc);
947
948         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
949         if (rc)
950                 GOTO(stop, rc);
951
952         info->fti_buf.lb_buf = ff;
953         info->fti_buf.lb_len = sizeof(*ff);
954         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
955                                   XATTR_NAME_FID, 0, th);
956         if (rc)
957                 GOTO(stop, rc);
958
959         rc = ofd_trans_start(env, ofd, fo, th);
960         if (rc)
961                 GOTO(stop, rc);
962
963         ofd_write_lock(env, fo);
964
965         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
966                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
967                                info->fti_xid);
968
969         if (!ofd_object_exists(fo))
970                 GOTO(unlock, rc = -ENOENT);
971
972         /* need to verify layout version */
973         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
974                 rc = ofd_verify_layout_version(env, fo, oa);
975                 if (rc)
976                         GOTO(unlock, rc);
977         }
978
979         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
980         if (rc)
981                 GOTO(unlock, rc);
982
983         fl = ofd_object_ff_update(env, fo, oa, ff);
984         if (fl < 0)
985                 GOTO(unlock, rc = fl);
986
987         rc = dt_attr_set(env, dob, la, th);
988         if (rc)
989                 GOTO(unlock, rc);
990
991         if (fl) {
992                 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
993                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
994                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
995                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
996                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
997                         GOTO(unlock, rc);
998
999                 info->fti_buf.lb_buf = ff;
1000                 info->fti_buf.lb_len = sizeof(*ff);
1001                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
1002                                   XATTR_NAME_FID, fl, th);
1003                 if (!rc)
1004                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
1005         }
1006
1007         GOTO(unlock, rc);
1008
1009 unlock:
1010         ofd_write_unlock(env, fo);
1011 stop:
1012         rc2 = ofd_trans_stop(env, ofd, th, rc);
1013         if (rc2 != 0)
1014                 CERROR("%s: failed to stop transaction: rc = %d\n",
1015                        ofd_name(ofd), rc2);
1016         if (!rc)
1017                 rc = rc2;
1018 out:
1019         return rc;
1020 }
1021
1022 /**
1023  * Destroy OFD object.
1024  *
1025  * This function destroys OFD object. If object wasn't used at all (orphan)
1026  * then local transaction is used, which means the transaction data is not
1027  * returned back in reply.
1028  *
1029  * \param[in] env       execution environment
1030  * \param[in] fo        OFD object
1031  * \param[in] orphan    flag to indicate that object is orphaned
1032  *
1033  * \retval              0 if successful
1034  * \retval              negative value on error
1035  */
1036 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
1037                        int orphan)
1038 {
1039         struct ofd_device       *ofd = ofd_obj2dev(fo);
1040         struct thandle          *th;
1041         int                     rc = 0;
1042         int                     rc2;
1043
1044         ENTRY;
1045
1046         if (!ofd_object_exists(fo))
1047                 GOTO(out, rc = -ENOENT);
1048
1049         th = ofd_trans_create(env, ofd);
1050         if (IS_ERR(th))
1051                 GOTO(out, rc = PTR_ERR(th));
1052
1053         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
1054         if (rc < 0)
1055                 GOTO(stop, rc);
1056
1057         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
1058         if (rc < 0)
1059                 GOTO(stop, rc);
1060
1061         if (orphan)
1062                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
1063         else
1064                 rc = ofd_trans_start(env, ofd, NULL, th);
1065         if (rc)
1066                 GOTO(stop, rc);
1067
1068         ofd_write_lock(env, fo);
1069         if (!ofd_object_exists(fo))
1070                 GOTO(unlock, rc = -ENOENT);
1071
1072         tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
1073
1074         dt_ref_del(env, ofd_object_child(fo), th);
1075         dt_destroy(env, ofd_object_child(fo), th);
1076 unlock:
1077         ofd_write_unlock(env, fo);
1078 stop:
1079         rc2 = ofd_trans_stop(env, ofd, th, rc);
1080         if (rc2)
1081                 CERROR("%s failed to stop transaction: %d\n",
1082                        ofd_name(ofd), rc2);
1083         if (!rc)
1084                 rc = rc2;
1085 out:
1086         RETURN(rc);
1087 }
1088
1089 /**
1090  * Get OFD object attributes.
1091  *
1092  * This function gets OFD object regular attributes. It is used to serve
1093  * incoming request as well as for local OFD purposes.
1094  *
1095  * \param[in] env       execution environment
1096  * \param[in] fo        OFD object
1097  * \param[in] la        object attributes
1098  *
1099  * \retval              0 if successful
1100  * \retval              negative value on error
1101  */
1102 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
1103                  struct lu_attr *la)
1104 {
1105         int rc = 0;
1106
1107         ENTRY;
1108
1109         if (ofd_object_exists(fo)) {
1110                 rc = dt_attr_get(env, ofd_object_child(fo), la);
1111         } else {
1112                 rc = -ENOENT;
1113         }
1114         RETURN(rc);
1115 }