Whamcloud - gitweb
LU-11017 quota: ignore quota for CAP_SYS_RESOURCE properly
[fs/lustre-release.git] / lustre / lod / lod_sub_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2015, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lod/lod_sub_object.c
27  *
28  * LOD sub object methods
29  *
30  * This file implements sub-object methods for LOD.
31  *
32  * LOD is Logic volume layer in the MDS stack, which will handle striping
33  * and distribute the update to different OSP/OSD. After directing the updates
34  * to one specific OSD/OSP, it also needs to do some thing before calling
35  * OSD/OSP API, for example recording updates for cross-MDT operation, get
36  * the next level transaction etc.
37  *
38  * Author: Di Wang <di.wang@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MDS
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <uapi/linux/lustre/lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <lustre_fid.h>
50 #include <uapi/linux/lustre/lustre_param.h>
51 #include <md_object.h>
52 #include <lustre_linkea.h>
53 #include <lustre_log.h>
54
55 #include "lod_internal.h"
56
57 struct thandle *lod_sub_get_thandle(const struct lu_env *env,
58                                     struct thandle *th,
59                                     const struct dt_object *sub_obj,
60                                     bool *record_update)
61 {
62         struct lod_device       *lod = dt2lod_dev(th->th_dev);
63         struct top_thandle      *tth;
64         struct thandle          *sub_th;
65         int                     type = LU_SEQ_RANGE_ANY;
66         __u32                   mdt_index;
67         int                     rc;
68         ENTRY;
69
70         if (record_update != NULL)
71                 *record_update = false;
72
73         if (th->th_top == NULL)
74                 RETURN(th);
75
76         tth = container_of(th, struct top_thandle, tt_super);
77         tth->tt_master_sub_thandle->th_ignore_quota = th->th_ignore_quota;
78
79         /* local object must be mdt object, Note: during ost object
80          * creation, FID is not assigned until osp_create(),
81          * so if the FID of sub_obj is zero, it means OST object. */
82         if (!dt_object_remote(sub_obj) ||
83             fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
84                 /* local MDT object */
85                 if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
86                     tth->tt_multiple_thandle != NULL &&
87                     record_update != NULL &&
88                     th->th_result == 0)
89                         *record_update = true;
90
91                 RETURN(tth->tt_master_sub_thandle);
92         }
93
94         rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
95                             &mdt_index, &type);
96         if (rc < 0)
97                 RETURN(ERR_PTR(rc));
98
99         /* th_complex means we need track all of updates for this
100          * transaction, include changes on OST */
101         if (type == LU_SEQ_RANGE_OST && !th->th_complex)
102                 RETURN(tth->tt_master_sub_thandle);
103
104         sub_th = thandle_get_sub(env, th, sub_obj);
105         if (IS_ERR(sub_th))
106                 RETURN(sub_th);
107         sub_th->th_ignore_quota = th->th_ignore_quota;
108
109         if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
110             th->th_result == 0)
111                 *record_update = true;
112
113         RETURN(sub_th);
114 }
115
116 /**
117  * Declare sub-object creation.
118  *
119  * Get transaction of next layer and declare the creation of the object.
120  *
121  * \param[in] env       execution environment
122  * \param[in] dt        the object being created
123  * \param[in] attr      the attributes of the object being created
124  * \param[in] hint      the hint of the creation
125  * \param[in] dof       the object format of the creation
126  * \param[th] th        the transaction handle
127  *
128  * \retval              0 if the declaration succeeds
129  * \retval              negative errno if the declaration fails.
130  */
131 int lod_sub_declare_create(const struct lu_env *env, struct dt_object *dt,
132                            struct lu_attr *attr,
133                            struct dt_allocation_hint *hint,
134                            struct dt_object_format *dof, struct thandle *th)
135 {
136         struct thandle *sub_th;
137         bool record_update;
138
139         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
140         if (IS_ERR(sub_th))
141                 return PTR_ERR(sub_th);
142
143         if (record_update)
144                 update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
145                                    attr, hint, dof);
146
147         return dt_declare_create(env, dt, attr, hint, dof, sub_th);
148 }
149
150 /**
151  * Create sub-object.
152  *
153  * Get transaction of next layer, record updates if it belongs to cross-MDT
154  * operation, and create the object.
155  *
156  * \param[in] env       execution environment
157  * \param[in] dt        the object being created
158  * \param[in] attr      the attributes of the object being created
159  * \param[in] hint      the hint of the creation
160  * \param[in] dof       the object format of the creation
161  * \param[th] th        the transaction handle
162  *
163  * \retval              0 if the creation succeeds
164  * \retval              negative errno if the creation fails.
165  */
166 int lod_sub_create(const struct lu_env *env, struct dt_object *dt,
167                    struct lu_attr *attr, struct dt_allocation_hint *hint,
168                    struct dt_object_format *dof, struct thandle *th)
169 {
170         struct thandle     *sub_th;
171         bool               record_update;
172         int                 rc;
173         ENTRY;
174
175         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
176         if (IS_ERR(sub_th))
177                 RETURN(PTR_ERR(sub_th));
178
179         if (record_update) {
180                 rc = update_record_pack(create, th,
181                                         lu_object_fid(&dt->do_lu),
182                                         attr, hint, dof);
183                 if (rc < 0)
184                         RETURN(rc);
185         }
186
187         rc = dt_create(env, dt, attr, hint, dof, sub_th);
188
189         RETURN(rc);
190 }
191
192 /**
193  * Declare adding reference for the sub-object
194  *
195  * Get transaction of next layer and declare the reference adding.
196  *
197  * \param[in] env       execution environment
198  * \param[in] dt        dt object to add reference
199  * \param[in] th        transaction handle
200  *
201  * \retval              0 if the declaration succeeds.
202  * \retval              negative errno if the declaration fails.
203  */
204 int lod_sub_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
205                             struct thandle *th)
206 {
207         struct thandle  *sub_th;
208         bool            record_update;
209         int             rc;
210         ENTRY;
211
212         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
213         if (IS_ERR(sub_th))
214                 RETURN(PTR_ERR(sub_th));
215
216         if (record_update)
217                 update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
218
219         rc = dt_declare_ref_add(env, dt, sub_th);
220
221         RETURN(rc);
222 }
223
224 /**
225  * Add reference for the sub-object
226  *
227  * Get transaction of next layer, record updates if it belongs to cross-MDT
228  * operation and add reference of the object.
229  *
230  * \param[in] env       execution environment
231  * \param[in] dt        dt object to add reference
232  * \param[in] th        transaction handle
233  *
234  * \retval              0 if it succeeds.
235  * \retval              negative errno if it fails.
236  */
237 int lod_sub_ref_add(const struct lu_env *env, struct dt_object *dt,
238                     struct thandle *th)
239 {
240         struct thandle  *sub_th;
241         bool            record_update;
242         int             rc;
243         ENTRY;
244
245         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
246         if (IS_ERR(sub_th))
247                 RETURN(PTR_ERR(sub_th));
248
249         if (record_update) {
250                 rc = update_record_pack(ref_add, th,
251                                         lu_object_fid(&dt->do_lu));
252                 if (rc < 0)
253                         RETURN(rc);
254         }
255
256         rc = dt_ref_add(env, dt, sub_th);
257
258         RETURN(rc);
259 }
260
261 /**
262  * Declare deleting reference for the sub-object
263  *
264  * Get transaction of next layer and declare the reference deleting.
265  *
266  * \param[in] env       execution environment
267  * \param[in] dt        dt object to delete reference
268  * \param[in] th        transaction handle
269  *
270  * \retval              0 if the declaration succeeds.
271  * \retval              negative errno if the declaration fails.
272  */
273 int lod_sub_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
274                             struct thandle *th)
275 {
276         struct thandle  *sub_th;
277         bool            record_update;
278         int             rc;
279         ENTRY;
280
281         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
282         if (IS_ERR(sub_th))
283                 RETURN(PTR_ERR(sub_th));
284
285         if (record_update)
286                 update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
287
288         rc = dt_declare_ref_del(env, dt, sub_th);
289
290         RETURN(rc);
291 }
292
293 /**
294  * Delete reference for the sub-object
295  *
296  * Get transaction of next layer, record updates if it belongs to cross-MDT
297  * operation and delete reference of the object.
298  *
299  * \param[in] env       execution environment
300  * \param[in] dt        dt object to delete reference
301  * \param[in] th        transaction handle
302  *
303  * \retval              0 if it succeeds.
304  * \retval              negative errno if it fails.
305  */
306 int lod_sub_ref_del(const struct lu_env *env, struct dt_object *dt,
307                     struct thandle *th)
308 {
309         struct thandle  *sub_th;
310         bool            record_update;
311         int             rc;
312         ENTRY;
313
314         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
315         if (IS_ERR(sub_th))
316                 RETURN(PTR_ERR(sub_th));
317
318         if (record_update) {
319                 rc = update_record_pack(ref_del, th,
320                                         lu_object_fid(&dt->do_lu));
321                 if (rc < 0)
322                         RETURN(rc);
323         }
324
325         rc = dt_ref_del(env, dt, sub_th);
326
327         RETURN(rc);
328 }
329
330 /**
331  * Declare destroying sub-object
332  *
333  * Get transaction of next layer and declare the sub-object destroy.
334  *
335  * \param[in] env       execution environment
336  * \param[in] dt        dt object to be destroyed
337  * \param[in] th        transaction handle
338  *
339  * \retval              0 if the declaration succeeds.
340  * \retval              negative errno if the declaration fails.
341  */
342 int lod_sub_declare_destroy(const struct lu_env *env, struct dt_object *dt,
343                             struct thandle *th)
344 {
345         struct thandle  *sub_th;
346         bool            record_update;
347         int             rc;
348         ENTRY;
349
350         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
351         if (IS_ERR(sub_th))
352                 RETURN(PTR_ERR(sub_th));
353
354         if (record_update)
355                 update_record_size(env, destroy, th, lu_object_fid(&dt->do_lu));
356
357         rc = dt_declare_destroy(env, dt, sub_th);
358
359         RETURN(rc);
360 }
361
362 /**
363  * Destroy sub-object
364  *
365  * Get transaction of next layer, record updates if it belongs to cross-MDT
366  * operation and destroy the object.
367  *
368  * \param[in] env       execution environment
369  * \param[in] dt        dt object to be destroyed
370  * \param[in] th        transaction handle
371  *
372  * \retval              0 if the destroy succeeds.
373  * \retval              negative errno if the destroy fails.
374  */
375 int lod_sub_destroy(const struct lu_env *env, struct dt_object *dt,
376                     struct thandle *th)
377 {
378         struct thandle  *sub_th;
379         bool            record_update;
380         int             rc;
381         ENTRY;
382
383         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
384         if (IS_ERR(sub_th))
385                 RETURN(PTR_ERR(sub_th));
386
387         if (record_update) {
388                 rc = update_record_pack(destroy, th, lu_object_fid(&dt->do_lu));
389                 if (rc < 0)
390                         RETURN(rc);
391         }
392
393         rc = dt_destroy(env, dt, sub_th);
394
395         RETURN(rc);
396 }
397
398 /**
399  * Declare sub-object index insert
400  *
401  * Get transaction of next layer and declare index insert.
402  *
403  * \param[in] env       execution environment
404  * \param[in] dt        object for which to insert index
405  * \param[in] rec       record of the index which will be inserted
406  * \param[in] key       key of the index which will be inserted
407  * \param[in] th        the transaction handle
408  *
409  * \retval              0 if the declaration succeeds.
410  * \retval              negative errno if the declaration fails.
411  */
412 int lod_sub_declare_insert(const struct lu_env *env, struct dt_object *dt,
413                            const struct dt_rec *rec,
414                            const struct dt_key *key, struct thandle *th)
415 {
416         struct thandle *sub_th;
417         bool            record_update;
418
419         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
420         if (IS_ERR(sub_th))
421                 return PTR_ERR(sub_th);
422
423         if (record_update)
424                 update_record_size(env, index_insert, th,
425                                    lu_object_fid(&dt->do_lu), rec, key);
426
427         return dt_declare_insert(env, dt, rec, key, sub_th);
428 }
429
430 /**
431  * Insert index of sub object
432  *
433  * Get transaction of next layer, record updates if it belongs to cross-MDT
434  * operation, and insert the index.
435  *
436  * \param[in] env       execution environment
437  * \param[in] dt        object for which to insert index
438  * \param[in] rec       record of the index to be inserted
439  * \param[in] key       key of the index to be inserted
440  * \param[in] th        the transaction handle
441  * \param[in] ign       whether ignore quota
442  *
443  * \retval              0 if the insertion succeeds.
444  * \retval              negative errno if the insertion fails.
445  */
446 int lod_sub_insert(const struct lu_env *env, struct dt_object *dt,
447                    const struct dt_rec *rec, const struct dt_key *key,
448                    struct thandle *th, int ign)
449 {
450         struct thandle *sub_th;
451         int             rc;
452         bool            record_update;
453
454         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
455         if (IS_ERR(sub_th))
456                 return PTR_ERR(sub_th);
457
458         if (record_update) {
459                 rc = update_record_pack(index_insert, th,
460                                         lu_object_fid(&dt->do_lu), rec, key);
461                 if (rc < 0)
462                         return rc;
463         }
464
465         return dt_insert(env, dt, rec, key, sub_th, ign);
466 }
467
468 /**
469  * Declare sub-object index delete
470  *
471  * Get transaction of next layer and declare index deletion.
472  *
473  * \param[in] env       execution environment
474  * \param[in] dt        object for which to delete index
475  * \param[in] key       key of the index which will be deleted
476  * \param[in] th        the transaction handle
477  *
478  * \retval              0 if the declaration succeeds.
479  * \retval              negative errno if the declaration fails.
480  */
481 int lod_sub_declare_delete(const struct lu_env *env, struct dt_object *dt,
482                            const struct dt_key *key, struct thandle *th)
483 {
484         struct thandle *sub_th;
485         bool            record_update;
486
487         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
488         if (IS_ERR(sub_th))
489                 return PTR_ERR(sub_th);
490
491         if (record_update)
492                 update_record_size(env, index_delete, th,
493                                    lu_object_fid(&dt->do_lu), key);
494
495         return dt_declare_delete(env, dt, key, sub_th);
496 }
497
498 /**
499  * Delete index of sub object
500  *
501  * Get transaction of next layer, record updates if it belongs to cross-MDT
502  * operation, and delete the index.
503  *
504  * \param[in] env       execution environment
505  * \param[in] dt        object for which to delete index
506  * \param[in] key       key of the index to be deleted
507  * \param[in] th        the transaction handle
508  *
509  * \retval              0 if the deletion succeeds.
510  * \retval              negative errno if the deletion fails.
511  */
512 int lod_sub_delete(const struct lu_env *env, struct dt_object *dt,
513                    const struct dt_key *name, struct thandle *th)
514 {
515         struct thandle  *sub_th;
516         bool            record_update;
517         int             rc;
518         ENTRY;
519
520         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
521         if (IS_ERR(sub_th))
522                 RETURN(PTR_ERR(sub_th));
523
524         if (record_update) {
525                 rc = update_record_pack(index_delete, th,
526                                         lu_object_fid(&dt->do_lu), name);
527                 if (rc < 0)
528                         RETURN(rc);
529         }
530
531         rc = dt_delete(env, dt, name, sub_th);
532         RETURN(rc);
533 }
534
535 /**
536  * Declare xattr_set
537  *
538  * Get transaction of next layer, and declare xattr set.
539  *
540  * \param[in] env       execution environment
541  * \param[in] dt        object on which to set xattr
542  * \param[in] buf       xattr to be set
543  * \param[in] name      name of the xattr
544  * \param[in] fl        flag for setting xattr
545  *
546  * \retval              0 if the declaration succeeds.
547  * \retval              negative errno if the declaration fails.
548  */
549 int lod_sub_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
550                               const struct lu_buf *buf, const char *name,
551                               int fl, struct thandle *th)
552 {
553         struct thandle  *sub_th;
554         bool            record_update;
555         int             rc;
556         ENTRY;
557
558         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
559         if (IS_ERR(sub_th))
560                 RETURN(PTR_ERR(sub_th));
561
562         if (record_update)
563                 update_record_size(env, xattr_set, th,
564                                    lu_object_fid(&dt->do_lu),
565                                    buf, name, fl);
566
567         rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
568
569         RETURN(rc);
570 }
571
572 /**
573  * Set xattr
574  *
575  * Get transaction of next layer, record updates if it belongs to cross-MDT
576  * operation, and set xattr to the object.
577  *
578  * \param[in] env       execution environment
579  * \param[in] dt        object on which to set xattr
580  * \param[in] buf       xattr to be set
581  * \param[in] name      name of the xattr
582  * \param[in] fl        flag for setting xattr
583  * \param[in] th        transaction handle
584  *
585  * \retval              0 if the xattr setting succeeds.
586  * \retval              negative errno if xattr setting fails.
587  */
588 int lod_sub_xattr_set(const struct lu_env *env, struct dt_object *dt,
589                       const struct lu_buf *buf, const char *name, int fl,
590                       struct thandle *th)
591 {
592         struct thandle  *sub_th;
593         bool            record_update;
594         int             rc;
595         ENTRY;
596
597         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
598         if (IS_ERR(sub_th))
599                 RETURN(PTR_ERR(sub_th));
600
601         if (record_update) {
602                 rc = update_record_pack(xattr_set, th,
603                                         lu_object_fid(&dt->do_lu),
604                                         buf, name, fl);
605                 if (rc < 0)
606                         RETURN(rc);
607         }
608
609         rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
610
611         RETURN(rc);
612 }
613
614 /**
615  * Declare attr_set
616  *
617  * Get transaction of next layer, and declare attr set.
618  *
619  * \param[in] env       execution environment
620  * \param[in] dt        object on which to set attr
621  * \param[in] attr      attributes to be set
622  * \param[in] th        transaction handle
623  *
624  * \retval              0 if the declaration succeeds.
625  * \retval              negative errno if the declaration fails.
626  */
627 int lod_sub_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
628                              const struct lu_attr *attr, struct thandle *th)
629 {
630         struct thandle  *sub_th;
631         bool            record_update;
632         int             rc;
633         ENTRY;
634
635         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
636         if (IS_ERR(sub_th))
637                 RETURN(PTR_ERR(sub_th));
638
639         if (record_update)
640                 update_record_size(env, attr_set, th,
641                                    lu_object_fid(&dt->do_lu), attr);
642
643         rc = dt_declare_attr_set(env, dt, attr, sub_th);
644
645         RETURN(rc);
646 }
647
648 /**
649  * attributes set
650  *
651  * Get transaction of next layer, record updates if it belongs to cross-MDT
652  * operation, and set attributes to the object.
653  *
654  * \param[in] env       execution environment
655  * \param[in] dt        object on which to set attr
656  * \param[in] attr      attrbutes to be set
657  * \param[in] th        transaction handle
658  *
659  * \retval              0 if attributes setting succeeds.
660  * \retval              negative errno if the attributes setting fails.
661  */
662 int lod_sub_attr_set(const struct lu_env *env, struct dt_object *dt,
663                      const struct lu_attr *attr, struct thandle *th)
664 {
665         bool               record_update;
666         struct thandle     *sub_th;
667         int                 rc;
668         ENTRY;
669
670         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
671         if (IS_ERR(sub_th))
672                 RETURN(PTR_ERR(sub_th));
673
674         if (record_update) {
675                 rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu),
676                                         attr);
677                 if (rc < 0)
678                         RETURN(rc);
679         }
680
681         rc = dt_attr_set(env, dt, attr, sub_th);
682
683         RETURN(rc);
684 }
685
686 /**
687  * Declare xattr_del
688  *
689  * Get transaction of next layer, and declare xattr deletion.
690  *
691  * \param[in] env       execution environment
692  * \param[in] dt        object on which to delete xattr
693  * \param[in] name      name of the xattr to be deleted
694  * \param[in] th        transaction handle
695  *
696  * \retval              0 if the declaration succeeds.
697  * \retval              negative errno if the declaration fails.
698  */
699 int lod_sub_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
700                               const char *name, struct thandle *th)
701 {
702         struct thandle  *sub_th;
703         bool            record_update;
704         int             rc;
705         ENTRY;
706
707         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
708         if (IS_ERR(sub_th))
709                 RETURN(PTR_ERR(sub_th));
710
711         if (record_update)
712                 update_record_size(env, xattr_del, th,
713                                    lu_object_fid(&dt->do_lu),
714                                    name);
715
716         rc = dt_declare_xattr_del(env, dt, name, sub_th);
717
718         RETURN(rc);
719 }
720
721 /**
722  * xattribute deletion
723  *
724  * Get transaction of next layer, record update if it belongs to cross-MDT
725  * operation and delete xattr.
726  *
727  * \param[in] env       execution environment
728  * \param[in] dt        object on which to delete xattr
729  * \param[in] name      name of the xattr to be deleted
730  * \param[in] th        transaction handle
731  *
732  * \retval              0 if the deletion succeeds.
733  * \retval              negative errno if the deletion fails.
734  */
735 int lod_sub_xattr_del(const struct lu_env *env, struct dt_object *dt,
736                       const char *name, struct thandle *th)
737 {
738         struct thandle  *sub_th;
739         bool            record_update;
740         int             rc;
741         ENTRY;
742
743         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
744         if (IS_ERR(sub_th))
745                 RETURN(PTR_ERR(sub_th));
746
747         if (record_update) {
748                 rc = update_record_pack(xattr_del, th,
749                                         lu_object_fid(&dt->do_lu), name);
750                 if (rc < 0)
751                         RETURN(rc);
752         }
753
754         rc = dt_xattr_del(env, dt, name, sub_th);
755
756         RETURN(rc);
757 }
758
759 /**
760  * Declare buffer write
761  *
762  * Get transaction of next layer and declare buffer write.
763  *
764  * \param[in] env       execution environment
765  * \param[in] dt        object to be written
766  * \param[in] buf       buffer to write which includes an embedded size field
767  * \param[in] pos       offet in the object to start writing at
768  * \param[in] th        transaction handle
769  *
770  * \retval              0 if the insertion succeeds.
771  * \retval              negative errno if the insertion fails.
772  */
773 int lod_sub_declare_write(const struct lu_env *env, struct dt_object *dt,
774                           const struct lu_buf *buf, loff_t pos,
775                           struct thandle *th)
776 {
777         struct thandle  *sub_th;
778         bool            record_update;
779         int             rc;
780         ENTRY;
781
782         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
783         if (IS_ERR(sub_th))
784                 RETURN(PTR_ERR(sub_th));
785
786         if (record_update)
787                 update_record_size(env, write, th,
788                                    lu_object_fid(&dt->do_lu),
789                                    buf, pos);
790
791         rc = dt_declare_write(env, dt, buf, pos, sub_th);
792
793         RETURN(rc);
794 }
795
796 /**
797  * Write buffer to sub object
798  *
799  * Get transaction of next layer, records buffer write if it belongs to
800  * Cross-MDT operation, and write buffer.
801  *
802  * \param[in] env       execution environment
803  * \param[in] dt        object to be written
804  * \param[in] buf       buffer to write which includes an embedded size field
805  * \param[in] pos       offet in the object to start writing at
806  * \param[in] th        transaction handle
807  * \param[in] rq        enforcement for this write
808  *
809  * \retval              the buffer size in bytes if it succeeds.
810  * \retval              negative errno if it fails.
811  */
812 ssize_t lod_sub_write(const struct lu_env *env, struct dt_object *dt,
813                       const struct lu_buf *buf, loff_t *pos,
814                       struct thandle *th, int rq)
815 {
816         struct thandle  *sub_th;
817         bool            record_update;
818         ssize_t         rc;
819         ENTRY;
820
821         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
822         if (IS_ERR(sub_th))
823                 RETURN(PTR_ERR(sub_th));
824
825         if (record_update) {
826                 rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu),
827                                         buf, *pos);
828                 if (rc < 0)
829                         RETURN(rc);
830         }
831
832         rc = dt_write(env, dt, buf, pos, sub_th, rq);
833         RETURN(rc);
834 }
835
836 /**
837  * Declare punch
838  *
839  * Get transaction of next layer and declare punch.
840  *
841  * \param[in] env       execution environment
842  * \param[in] dt        object to be written
843  * \param[in] start     start offset of punch
844  * \param[in] end       end offet of punch
845  * \param[in] th        transaction handle
846  *
847  * \retval              0 if the insertion succeeds.
848  * \retval              negative errno if the insertion fails.
849  */
850 int lod_sub_declare_punch(const struct lu_env *env, struct dt_object *dt,
851                           __u64 start, __u64 end, struct thandle *th)
852 {
853         struct thandle  *sub_th;
854         bool            record_update;
855         int             rc;
856         ENTRY;
857
858         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
859         if (IS_ERR(sub_th))
860                 RETURN(PTR_ERR(sub_th));
861
862         if (record_update)
863                 update_record_size(env, punch, th,
864                                    lu_object_fid(&dt->do_lu),
865                                    start, end);
866
867         rc = dt_declare_punch(env, dt, start, end, sub_th);
868
869         RETURN(rc);
870 }
871
872 /**
873  * Punch to sub object
874  *
875  * Get transaction of next layer, records buffer write if it belongs to
876  * Cross-MDT operation, and punch object.
877  *
878  * \param[in] env       execution environment
879  * \param[in] dt        object to be written
880  * \param[in] start     start offset of punch
881  * \param[in] end       end offset of punch
882  * \param[in] th        transaction handle
883  * \param[in] capa      capability of the write
884  *
885  * \retval              the buffer size in bytes if it succeeds.
886  * \retval              negative errno if it fails.
887  */
888 int lod_sub_punch(const struct lu_env *env, struct dt_object *dt,
889                   __u64 start, __u64 end, struct thandle *th)
890 {
891         struct thandle  *sub_th;
892         bool            record_update;
893         int             rc;
894         ENTRY;
895
896         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
897         if (IS_ERR(sub_th))
898                 RETURN(PTR_ERR(sub_th));
899
900         if (record_update) {
901                 rc = update_record_pack(punch, th, lu_object_fid(&dt->do_lu),
902                                         start, end);
903                 if (rc < 0)
904                         RETURN(rc);
905         }
906
907         rc = dt_punch(env, dt, start, end, sub_th);
908
909         RETURN(rc);
910 }
911
912 int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
913                       struct dt_device *dt, int index)
914 {
915         struct lod_thread_info  *lti = lod_env_info(env);
916         struct llog_ctxt        *ctxt;
917         struct llog_handle      *lgh;
918         struct llog_catid       *cid = &lti->lti_cid;
919         struct lu_fid           *fid = &lti->lti_fid;
920         struct obd_device       *obd;
921         int                     rc;
922         bool                    need_put = false;
923         ENTRY;
924
925         lu_update_log_fid(fid, index);
926
927         rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
928         if (rc < 0)
929                 RETURN(rc);
930
931         rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid);
932         if (rc != 0) {
933                 CERROR("%s: can't get id from catalogs: rc = %d\n",
934                        lod2obd(lod)->obd_name, rc);
935                 RETURN(rc);
936         }
937
938         obd = dt->dd_lu_dev.ld_obd;
939         ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
940         LASSERT(ctxt != NULL);
941         ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
942         ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
943         if (likely(logid_id(&cid->lci_logid) != 0)) {
944                 rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
945                                LLOG_OPEN_EXISTS);
946
947                 /* re-create llog if it is missing */
948                 if (rc == -ENOENT)
949                         logid_set_id(&cid->lci_logid, 0);
950                 else if (rc < 0)
951                         GOTO(out_put, rc);
952         }
953
954         if (unlikely(logid_id(&cid->lci_logid) == 0)) {
955                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
956                 if (rc < 0)
957                         GOTO(out_put, rc);
958                 cid->lci_logid = lgh->lgh_id;
959                 need_put = true;
960         }
961
962         LASSERT(lgh != NULL);
963
964         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
965         if (rc != 0)
966                 GOTO(out_close, rc);
967
968         if (need_put) {
969                 rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
970                 if (rc != 0)
971                         GOTO(out_close, rc);
972         }
973
974         ctxt->loc_handle = lgh;
975
976         CDEBUG(D_INFO, "%s: init llog for index %d - catid "DFID":%x\n",
977                obd->obd_name, index, PFID(&cid->lci_logid.lgl_oi.oi_fid),
978                cid->lci_logid.lgl_ogen);
979 out_close:
980         if (rc != 0)
981                 llog_cat_close(env, lgh);
982 out_put:
983         llog_ctxt_put(ctxt);
984         RETURN(rc);
985 }