Whamcloud - gitweb
e2eb8d7fcd7fae626353af30e66334d2518ed207
[fs/lustre-release.git] / lustre / lod / lod_sub_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2015, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lod/lod_sub_object.c
27  *
28  * LOD sub object methods
29  *
30  * This file implements sub-object methods for LOD.
31  *
32  * LOD is Logic volume layer in the MDS stack, which will handle striping
33  * and distribute the update to different OSP/OSD. After directing the updates
34  * to one specific OSD/OSP, it also needs to do some thing before calling
35  * OSD/OSP API, for example recording updates for cross-MDT operation, get
36  * the next level transaction etc.
37  *
38  * Author: Di Wang <di.wang@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MDS
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <uapi/linux/lustre/lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <lustre_fid.h>
50 #include <uapi/linux/lustre/lustre_param.h>
51 #include <md_object.h>
52 #include <lustre_linkea.h>
53 #include <lustre_log.h>
54
55 #include "lod_internal.h"
56
57 struct thandle *lod_sub_get_thandle(const struct lu_env *env,
58                                     struct thandle *th,
59                                     const struct dt_object *sub_obj,
60                                     bool *record_update)
61 {
62         struct lod_device       *lod = dt2lod_dev(th->th_dev);
63         struct top_thandle      *tth;
64         struct thandle          *sub_th;
65         int                     type = LU_SEQ_RANGE_ANY;
66         __u32                   mdt_index;
67         int                     rc;
68         ENTRY;
69
70         if (record_update != NULL)
71                 *record_update = false;
72
73         if (th->th_top == NULL)
74                 RETURN(th);
75
76         tth = container_of(th, struct top_thandle, tt_super);
77         tth->tt_master_sub_thandle->th_ignore_quota = th->th_ignore_quota;
78
79         /* local object must be mdt object, Note: during ost object
80          * creation, FID is not assigned until osp_create(),
81          * so if the FID of sub_obj is zero, it means OST object. */
82         if (!dt_object_remote(sub_obj) ||
83             fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
84                 /* local MDT object */
85                 if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
86                     tth->tt_multiple_thandle != NULL &&
87                     record_update != NULL &&
88                     th->th_result == 0)
89                         *record_update = true;
90
91                 RETURN(tth->tt_master_sub_thandle);
92         }
93
94         rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
95                             &mdt_index, &type);
96         if (rc < 0)
97                 RETURN(ERR_PTR(rc));
98
99         /* th_complex means we need track all of updates for this
100          * transaction, include changes on OST */
101         if (type == LU_SEQ_RANGE_OST && !th->th_complex)
102                 RETURN(tth->tt_master_sub_thandle);
103
104         sub_th = thandle_get_sub(env, th, sub_obj);
105         if (IS_ERR(sub_th))
106                 RETURN(sub_th);
107         sub_th->th_ignore_quota = th->th_ignore_quota;
108
109         if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
110             th->th_result == 0)
111                 *record_update = true;
112
113         RETURN(sub_th);
114 }
115
116 /**
117  * Declare sub-object creation.
118  *
119  * Get transaction of next layer and declare the creation of the object.
120  *
121  * \param[in] env       execution environment
122  * \param[in] dt        the object being created
123  * \param[in] attr      the attributes of the object being created
124  * \param[in] hint      the hint of the creation
125  * \param[in] dof       the object format of the creation
126  * \param[th] th        the transaction handle
127  *
128  * \retval              0 if the declaration succeeds
129  * \retval              negative errno if the declaration fails.
130  */
131 int lod_sub_declare_create(const struct lu_env *env, struct dt_object *dt,
132                            struct lu_attr *attr,
133                            struct dt_allocation_hint *hint,
134                            struct dt_object_format *dof, struct thandle *th)
135 {
136         struct thandle *sub_th;
137         bool record_update;
138
139         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
140         if (IS_ERR(sub_th))
141                 return PTR_ERR(sub_th);
142
143         if (record_update)
144                 update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
145                                    attr, hint, dof);
146
147         return dt_declare_create(env, dt, attr, hint, dof, sub_th);
148 }
149
150 /**
151  * Create sub-object.
152  *
153  * Get transaction of next layer, record updates if it belongs to cross-MDT
154  * operation, and create the object.
155  *
156  * \param[in] env       execution environment
157  * \param[in] dt        the object being created
158  * \param[in] attr      the attributes of the object being created
159  * \param[in] hint      the hint of the creation
160  * \param[in] dof       the object format of the creation
161  * \param[th] th        the transaction handle
162  *
163  * \retval              0 if the creation succeeds
164  * \retval              negative errno if the creation fails.
165  */
166 int lod_sub_create(const struct lu_env *env, struct dt_object *dt,
167                    struct lu_attr *attr, struct dt_allocation_hint *hint,
168                    struct dt_object_format *dof, struct thandle *th)
169 {
170         struct thandle     *sub_th;
171         bool               record_update;
172         int                 rc;
173         ENTRY;
174
175         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
176         if (IS_ERR(sub_th))
177                 RETURN(PTR_ERR(sub_th));
178
179         if (record_update) {
180                 rc = update_record_pack(create, th,
181                                         lu_object_fid(&dt->do_lu),
182                                         attr, hint, dof);
183                 if (rc < 0)
184                         RETURN(rc);
185         }
186
187         rc = dt_create(env, dt, attr, hint, dof, sub_th);
188
189         RETURN(rc);
190 }
191
192 /**
193  * Declare adding reference for the sub-object
194  *
195  * Get transaction of next layer and declare the reference adding.
196  *
197  * \param[in] env       execution environment
198  * \param[in] dt        dt object to add reference
199  * \param[in] th        transaction handle
200  *
201  * \retval              0 if the declaration succeeds.
202  * \retval              negative errno if the declaration fails.
203  */
204 int lod_sub_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
205                             struct thandle *th)
206 {
207         struct thandle  *sub_th;
208         bool            record_update;
209         int             rc;
210         ENTRY;
211
212         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
213         if (IS_ERR(sub_th))
214                 RETURN(PTR_ERR(sub_th));
215
216         if (record_update)
217                 update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
218
219         rc = dt_declare_ref_add(env, dt, sub_th);
220
221         RETURN(rc);
222 }
223
224 /**
225  * Add reference for the sub-object
226  *
227  * Get transaction of next layer, record updates if it belongs to cross-MDT
228  * operation and add reference of the object.
229  *
230  * \param[in] env       execution environment
231  * \param[in] dt        dt object to add reference
232  * \param[in] th        transaction handle
233  *
234  * \retval              0 if it succeeds.
235  * \retval              negative errno if it fails.
236  */
237 int lod_sub_ref_add(const struct lu_env *env, struct dt_object *dt,
238                     struct thandle *th)
239 {
240         struct thandle  *sub_th;
241         bool            record_update;
242         int             rc;
243         ENTRY;
244
245         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
246         if (IS_ERR(sub_th))
247                 RETURN(PTR_ERR(sub_th));
248
249         if (record_update) {
250                 rc = update_record_pack(ref_add, th,
251                                         lu_object_fid(&dt->do_lu));
252                 if (rc < 0)
253                         RETURN(rc);
254         }
255
256         rc = dt_ref_add(env, dt, sub_th);
257
258         RETURN(rc);
259 }
260
261 /**
262  * Declare deleting reference for the sub-object
263  *
264  * Get transaction of next layer and declare the reference deleting.
265  *
266  * \param[in] env       execution environment
267  * \param[in] dt        dt object to delete reference
268  * \param[in] th        transaction handle
269  *
270  * \retval              0 if the declaration succeeds.
271  * \retval              negative errno if the declaration fails.
272  */
273 int lod_sub_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
274                             struct thandle *th)
275 {
276         struct thandle  *sub_th;
277         bool            record_update;
278         int             rc;
279         ENTRY;
280
281         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
282         if (IS_ERR(sub_th))
283                 RETURN(PTR_ERR(sub_th));
284
285         if (record_update)
286                 update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
287
288         rc = dt_declare_ref_del(env, dt, sub_th);
289
290         RETURN(rc);
291 }
292
293 /**
294  * Delete reference for the sub-object
295  *
296  * Get transaction of next layer, record updates if it belongs to cross-MDT
297  * operation and delete reference of the object.
298  *
299  * \param[in] env       execution environment
300  * \param[in] dt        dt object to delete reference
301  * \param[in] th        transaction handle
302  *
303  * \retval              0 if it succeeds.
304  * \retval              negative errno if it fails.
305  */
306 int lod_sub_ref_del(const struct lu_env *env, struct dt_object *dt,
307                     struct thandle *th)
308 {
309         struct thandle  *sub_th;
310         bool            record_update;
311         int             rc;
312         ENTRY;
313
314         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
315         if (IS_ERR(sub_th))
316                 RETURN(PTR_ERR(sub_th));
317
318         if (record_update) {
319                 rc = update_record_pack(ref_del, th,
320                                         lu_object_fid(&dt->do_lu));
321                 if (rc < 0)
322                         RETURN(rc);
323         }
324
325         rc = dt_ref_del(env, dt, sub_th);
326
327         RETURN(rc);
328 }
329
330 /**
331  * Declare destroying sub-object
332  *
333  * Get transaction of next layer and declare the sub-object destroy.
334  *
335  * \param[in] env       execution environment
336  * \param[in] dt        dt object to be destroyed
337  * \param[in] th        transaction handle
338  *
339  * \retval              0 if the declaration succeeds.
340  * \retval              negative errno if the declaration fails.
341  */
342 int lod_sub_declare_destroy(const struct lu_env *env, struct dt_object *dt,
343                             struct thandle *th)
344 {
345         struct thandle  *sub_th;
346         bool            record_update;
347         int             rc;
348         ENTRY;
349
350         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
351         if (IS_ERR(sub_th))
352                 RETURN(PTR_ERR(sub_th));
353
354         if (record_update)
355                 update_record_size(env, destroy, th, lu_object_fid(&dt->do_lu));
356
357         rc = dt_declare_destroy(env, dt, sub_th);
358
359         RETURN(rc);
360 }
361
362 /**
363  * Destroy sub-object
364  *
365  * Get transaction of next layer, record updates if it belongs to cross-MDT
366  * operation and destroy the object.
367  *
368  * \param[in] env       execution environment
369  * \param[in] dt        dt object to be destroyed
370  * \param[in] th        transaction handle
371  *
372  * \retval              0 if the destroy succeeds.
373  * \retval              negative errno if the destroy fails.
374  */
375 int lod_sub_destroy(const struct lu_env *env, struct dt_object *dt,
376                     struct thandle *th)
377 {
378         struct thandle  *sub_th;
379         bool            record_update;
380         int             rc;
381         ENTRY;
382
383         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
384         if (IS_ERR(sub_th))
385                 RETURN(PTR_ERR(sub_th));
386
387         if (record_update) {
388                 rc = update_record_pack(destroy, th, lu_object_fid(&dt->do_lu));
389                 if (rc < 0)
390                         RETURN(rc);
391         }
392
393         rc = dt_destroy(env, dt, sub_th);
394
395         RETURN(rc);
396 }
397
398 /**
399  * Declare sub-object index insert
400  *
401  * Get transaction of next layer and declare index insert.
402  *
403  * \param[in] env       execution environment
404  * \param[in] dt        object for which to insert index
405  * \param[in] rec       record of the index which will be inserted
406  * \param[in] key       key of the index which will be inserted
407  * \param[in] th        the transaction handle
408  *
409  * \retval              0 if the declaration succeeds.
410  * \retval              negative errno if the declaration fails.
411  */
412 int lod_sub_declare_insert(const struct lu_env *env, struct dt_object *dt,
413                            const struct dt_rec *rec,
414                            const struct dt_key *key, struct thandle *th)
415 {
416         struct thandle *sub_th;
417         bool            record_update;
418
419         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
420         if (IS_ERR(sub_th))
421                 return PTR_ERR(sub_th);
422
423         if (record_update)
424                 update_record_size(env, index_insert, th,
425                                    lu_object_fid(&dt->do_lu), rec, key);
426
427         return dt_declare_insert(env, dt, rec, key, sub_th);
428 }
429
430 /**
431  * Insert index of sub object
432  *
433  * Get transaction of next layer, record updates if it belongs to cross-MDT
434  * operation, and insert the index.
435  *
436  * \param[in] env       execution environment
437  * \param[in] dt        object for which to insert index
438  * \param[in] rec       record of the index to be inserted
439  * \param[in] key       key of the index to be inserted
440  * \param[in] th        the transaction handle
441  *
442  * \retval              0 if the insertion succeeds.
443  * \retval              negative errno if the insertion fails.
444  */
445 int lod_sub_insert(const struct lu_env *env, struct dt_object *dt,
446                    const struct dt_rec *rec, const struct dt_key *key,
447                    struct thandle *th)
448 {
449         struct thandle *sub_th;
450         int             rc;
451         bool            record_update;
452
453         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
454         if (IS_ERR(sub_th))
455                 return PTR_ERR(sub_th);
456
457         if (record_update) {
458                 rc = update_record_pack(index_insert, th,
459                                         lu_object_fid(&dt->do_lu), rec, key);
460                 if (rc < 0)
461                         return rc;
462         }
463
464         return dt_insert(env, dt, rec, key, sub_th);
465 }
466
467 /**
468  * Declare sub-object index delete
469  *
470  * Get transaction of next layer and declare index deletion.
471  *
472  * \param[in] env       execution environment
473  * \param[in] dt        object for which to delete index
474  * \param[in] key       key of the index which will be deleted
475  * \param[in] th        the transaction handle
476  *
477  * \retval              0 if the declaration succeeds.
478  * \retval              negative errno if the declaration fails.
479  */
480 int lod_sub_declare_delete(const struct lu_env *env, struct dt_object *dt,
481                            const struct dt_key *key, struct thandle *th)
482 {
483         struct thandle *sub_th;
484         bool            record_update;
485
486         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
487         if (IS_ERR(sub_th))
488                 return PTR_ERR(sub_th);
489
490         if (record_update)
491                 update_record_size(env, index_delete, th,
492                                    lu_object_fid(&dt->do_lu), key);
493
494         return dt_declare_delete(env, dt, key, sub_th);
495 }
496
497 /**
498  * Delete index of sub object
499  *
500  * Get transaction of next layer, record updates if it belongs to cross-MDT
501  * operation, and delete the index.
502  *
503  * \param[in] env       execution environment
504  * \param[in] dt        object for which to delete index
505  * \param[in] key       key of the index to be deleted
506  * \param[in] th        the transaction handle
507  *
508  * \retval              0 if the deletion succeeds.
509  * \retval              negative errno if the deletion fails.
510  */
511 int lod_sub_delete(const struct lu_env *env, struct dt_object *dt,
512                    const struct dt_key *name, struct thandle *th)
513 {
514         struct thandle  *sub_th;
515         bool            record_update;
516         int             rc;
517         ENTRY;
518
519         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
520         if (IS_ERR(sub_th))
521                 RETURN(PTR_ERR(sub_th));
522
523         if (record_update) {
524                 rc = update_record_pack(index_delete, th,
525                                         lu_object_fid(&dt->do_lu), name);
526                 if (rc < 0)
527                         RETURN(rc);
528         }
529
530         rc = dt_delete(env, dt, name, sub_th);
531         RETURN(rc);
532 }
533
534 /**
535  * Declare xattr_set
536  *
537  * Get transaction of next layer, and declare xattr set.
538  *
539  * \param[in] env       execution environment
540  * \param[in] dt        object on which to set xattr
541  * \param[in] buf       xattr to be set
542  * \param[in] name      name of the xattr
543  * \param[in] fl        flag for setting xattr
544  *
545  * \retval              0 if the declaration succeeds.
546  * \retval              negative errno if the declaration fails.
547  */
548 int lod_sub_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
549                               const struct lu_buf *buf, const char *name,
550                               int fl, struct thandle *th)
551 {
552         struct thandle  *sub_th;
553         bool            record_update;
554         int             rc;
555         ENTRY;
556
557         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
558         if (IS_ERR(sub_th))
559                 RETURN(PTR_ERR(sub_th));
560
561         if (record_update)
562                 update_record_size(env, xattr_set, th,
563                                    lu_object_fid(&dt->do_lu),
564                                    buf, name, fl);
565
566         rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
567
568         RETURN(rc);
569 }
570
571 /**
572  * Set xattr
573  *
574  * Get transaction of next layer, record updates if it belongs to cross-MDT
575  * operation, and set xattr to the object.
576  *
577  * \param[in] env       execution environment
578  * \param[in] dt        object on which to set xattr
579  * \param[in] buf       xattr to be set
580  * \param[in] name      name of the xattr
581  * \param[in] fl        flag for setting xattr
582  * \param[in] th        transaction handle
583  *
584  * \retval              0 if the xattr setting succeeds.
585  * \retval              negative errno if xattr setting fails.
586  */
587 int lod_sub_xattr_set(const struct lu_env *env, struct dt_object *dt,
588                       const struct lu_buf *buf, const char *name, int fl,
589                       struct thandle *th)
590 {
591         struct thandle  *sub_th;
592         bool            record_update;
593         int             rc;
594         ENTRY;
595
596         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
597         if (IS_ERR(sub_th))
598                 RETURN(PTR_ERR(sub_th));
599
600         if (record_update) {
601                 rc = update_record_pack(xattr_set, th,
602                                         lu_object_fid(&dt->do_lu),
603                                         buf, name, fl);
604                 if (rc < 0)
605                         RETURN(rc);
606         }
607
608         rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
609
610         RETURN(rc);
611 }
612
613 /**
614  * Declare attr_set
615  *
616  * Get transaction of next layer, and declare attr set.
617  *
618  * \param[in] env       execution environment
619  * \param[in] dt        object on which to set attr
620  * \param[in] attr      attributes to be set
621  * \param[in] th        transaction handle
622  *
623  * \retval              0 if the declaration succeeds.
624  * \retval              negative errno if the declaration fails.
625  */
626 int lod_sub_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
627                              const struct lu_attr *attr, struct thandle *th)
628 {
629         struct thandle  *sub_th;
630         bool            record_update;
631         int             rc;
632         ENTRY;
633
634         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
635         if (IS_ERR(sub_th))
636                 RETURN(PTR_ERR(sub_th));
637
638         if (record_update)
639                 update_record_size(env, attr_set, th,
640                                    lu_object_fid(&dt->do_lu), attr);
641
642         rc = dt_declare_attr_set(env, dt, attr, sub_th);
643
644         RETURN(rc);
645 }
646
647 /**
648  * attributes set
649  *
650  * Get transaction of next layer, record updates if it belongs to cross-MDT
651  * operation, and set attributes to the object.
652  *
653  * \param[in] env       execution environment
654  * \param[in] dt        object on which to set attr
655  * \param[in] attr      attrbutes to be set
656  * \param[in] th        transaction handle
657  *
658  * \retval              0 if attributes setting succeeds.
659  * \retval              negative errno if the attributes setting fails.
660  */
661 int lod_sub_attr_set(const struct lu_env *env, struct dt_object *dt,
662                      const struct lu_attr *attr, struct thandle *th)
663 {
664         bool               record_update;
665         struct thandle     *sub_th;
666         int                 rc;
667         ENTRY;
668
669         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
670         if (IS_ERR(sub_th))
671                 RETURN(PTR_ERR(sub_th));
672
673         if (record_update) {
674                 rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu),
675                                         attr);
676                 if (rc < 0)
677                         RETURN(rc);
678         }
679
680         rc = dt_attr_set(env, dt, attr, sub_th);
681
682         RETURN(rc);
683 }
684
685 /**
686  * Declare xattr_del
687  *
688  * Get transaction of next layer, and declare xattr deletion.
689  *
690  * \param[in] env       execution environment
691  * \param[in] dt        object on which to delete xattr
692  * \param[in] name      name of the xattr to be deleted
693  * \param[in] th        transaction handle
694  *
695  * \retval              0 if the declaration succeeds.
696  * \retval              negative errno if the declaration fails.
697  */
698 int lod_sub_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
699                               const char *name, struct thandle *th)
700 {
701         struct thandle  *sub_th;
702         bool            record_update;
703         int             rc;
704         ENTRY;
705
706         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
707         if (IS_ERR(sub_th))
708                 RETURN(PTR_ERR(sub_th));
709
710         if (record_update)
711                 update_record_size(env, xattr_del, th,
712                                    lu_object_fid(&dt->do_lu),
713                                    name);
714
715         rc = dt_declare_xattr_del(env, dt, name, sub_th);
716
717         RETURN(rc);
718 }
719
720 /**
721  * xattribute deletion
722  *
723  * Get transaction of next layer, record update if it belongs to cross-MDT
724  * operation and delete xattr.
725  *
726  * \param[in] env       execution environment
727  * \param[in] dt        object on which to delete xattr
728  * \param[in] name      name of the xattr to be deleted
729  * \param[in] th        transaction handle
730  *
731  * \retval              0 if the deletion succeeds.
732  * \retval              negative errno if the deletion fails.
733  */
734 int lod_sub_xattr_del(const struct lu_env *env, struct dt_object *dt,
735                       const char *name, struct thandle *th)
736 {
737         struct thandle  *sub_th;
738         bool            record_update;
739         int             rc;
740         ENTRY;
741
742         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
743         if (IS_ERR(sub_th))
744                 RETURN(PTR_ERR(sub_th));
745
746         if (record_update) {
747                 rc = update_record_pack(xattr_del, th,
748                                         lu_object_fid(&dt->do_lu), name);
749                 if (rc < 0)
750                         RETURN(rc);
751         }
752
753         rc = dt_xattr_del(env, dt, name, sub_th);
754
755         RETURN(rc);
756 }
757
758 /**
759  * Declare buffer write
760  *
761  * Get transaction of next layer and declare buffer write.
762  *
763  * \param[in] env       execution environment
764  * \param[in] dt        object to be written
765  * \param[in] buf       buffer to write which includes an embedded size field
766  * \param[in] pos       offet in the object to start writing at
767  * \param[in] th        transaction handle
768  *
769  * \retval              0 if the insertion succeeds.
770  * \retval              negative errno if the insertion fails.
771  */
772 int lod_sub_declare_write(const struct lu_env *env, struct dt_object *dt,
773                           const struct lu_buf *buf, loff_t pos,
774                           struct thandle *th)
775 {
776         struct thandle  *sub_th;
777         bool            record_update;
778         int             rc;
779         ENTRY;
780
781         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
782         if (IS_ERR(sub_th))
783                 RETURN(PTR_ERR(sub_th));
784
785         if (record_update)
786                 update_record_size(env, write, th,
787                                    lu_object_fid(&dt->do_lu),
788                                    buf, pos);
789
790         rc = dt_declare_write(env, dt, buf, pos, sub_th);
791
792         RETURN(rc);
793 }
794
795 /**
796  * Write buffer to sub object
797  *
798  * Get transaction of next layer, records buffer write if it belongs to
799  * Cross-MDT operation, and write buffer.
800  *
801  * \param[in] env       execution environment
802  * \param[in] dt        object to be written
803  * \param[in] buf       buffer to write which includes an embedded size field
804  * \param[in] pos       offet in the object to start writing at
805  * \param[in] th        transaction handle
806  *
807  * \retval              the buffer size in bytes if it succeeds.
808  * \retval              negative errno if it fails.
809  */
810 ssize_t lod_sub_write(const struct lu_env *env, struct dt_object *dt,
811                       const struct lu_buf *buf, loff_t *pos,
812                       struct thandle *th)
813 {
814         struct thandle  *sub_th;
815         bool            record_update;
816         ssize_t         rc;
817         ENTRY;
818
819         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
820         if (IS_ERR(sub_th))
821                 RETURN(PTR_ERR(sub_th));
822
823         if (record_update) {
824                 rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu),
825                                         buf, *pos);
826                 if (rc < 0)
827                         RETURN(rc);
828         }
829
830         rc = dt_write(env, dt, buf, pos, sub_th);
831         RETURN(rc);
832 }
833
834 /**
835  * Declare punch
836  *
837  * Get transaction of next layer and declare punch.
838  *
839  * \param[in] env       execution environment
840  * \param[in] dt        object to be written
841  * \param[in] start     start offset of punch
842  * \param[in] end       end offet of punch
843  * \param[in] th        transaction handle
844  *
845  * \retval              0 if the insertion succeeds.
846  * \retval              negative errno if the insertion fails.
847  */
848 int lod_sub_declare_punch(const struct lu_env *env, struct dt_object *dt,
849                           __u64 start, __u64 end, struct thandle *th)
850 {
851         struct thandle  *sub_th;
852         bool            record_update;
853         int             rc;
854         ENTRY;
855
856         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
857         if (IS_ERR(sub_th))
858                 RETURN(PTR_ERR(sub_th));
859
860         if (record_update)
861                 update_record_size(env, punch, th,
862                                    lu_object_fid(&dt->do_lu),
863                                    start, end);
864
865         rc = dt_declare_punch(env, dt, start, end, sub_th);
866
867         RETURN(rc);
868 }
869
870 /**
871  * Punch to sub object
872  *
873  * Get transaction of next layer, records buffer write if it belongs to
874  * Cross-MDT operation, and punch object.
875  *
876  * \param[in] env       execution environment
877  * \param[in] dt        object to be written
878  * \param[in] start     start offset of punch
879  * \param[in] end       end offset of punch
880  * \param[in] th        transaction handle
881  * \param[in] capa      capability of the write
882  *
883  * \retval              the buffer size in bytes if it succeeds.
884  * \retval              negative errno if it fails.
885  */
886 int lod_sub_punch(const struct lu_env *env, struct dt_object *dt,
887                   __u64 start, __u64 end, struct thandle *th)
888 {
889         struct thandle  *sub_th;
890         bool            record_update;
891         int             rc;
892         ENTRY;
893
894         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
895         if (IS_ERR(sub_th))
896                 RETURN(PTR_ERR(sub_th));
897
898         if (record_update) {
899                 rc = update_record_pack(punch, th, lu_object_fid(&dt->do_lu),
900                                         start, end);
901                 if (rc < 0)
902                         RETURN(rc);
903         }
904
905         rc = dt_punch(env, dt, start, end, sub_th);
906
907         RETURN(rc);
908 }
909
910 int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
911                       struct dt_device *dt, int index)
912 {
913         struct lod_thread_info  *lti = lod_env_info(env);
914         struct llog_ctxt        *ctxt;
915         struct llog_handle      *lgh;
916         struct llog_catid       *cid = &lti->lti_cid;
917         struct lu_fid           *fid = &lti->lti_fid;
918         struct obd_device       *obd;
919         int                     rc;
920         bool                    need_put = false;
921         ENTRY;
922
923         lu_update_log_fid(fid, index);
924
925         rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
926         if (rc < 0)
927                 RETURN(rc);
928
929         rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid);
930         if (rc != 0) {
931                 CERROR("%s: can't get id from catalogs: rc = %d\n",
932                        lod2obd(lod)->obd_name, rc);
933                 RETURN(rc);
934         }
935
936         obd = dt->dd_lu_dev.ld_obd;
937         ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
938         LASSERT(ctxt != NULL);
939         ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
940         ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
941         if (likely(logid_id(&cid->lci_logid) != 0)) {
942                 rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
943                                LLOG_OPEN_EXISTS);
944
945                 /* re-create llog if it is missing */
946                 if (rc == -ENOENT)
947                         logid_set_id(&cid->lci_logid, 0);
948                 else if (rc < 0)
949                         GOTO(out_put, rc);
950         }
951
952         if (unlikely(logid_id(&cid->lci_logid) == 0)) {
953                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
954                 if (rc < 0)
955                         GOTO(out_put, rc);
956                 cid->lci_logid = lgh->lgh_id;
957                 need_put = true;
958         }
959
960         LASSERT(lgh != NULL);
961
962         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
963         if (rc != 0)
964                 GOTO(out_close, rc);
965
966         if (need_put) {
967                 rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
968                 if (rc != 0)
969                         GOTO(out_close, rc);
970         }
971
972         ctxt->loc_handle = lgh;
973
974         CDEBUG(D_INFO, "%s: init llog for index %d - catid "DFID":%x\n",
975                obd->obd_name, index, PFID(&cid->lci_logid.lgl_oi.oi_fid),
976                cid->lci_logid.lgl_ogen);
977 out_close:
978         if (rc != 0)
979                 llog_cat_close(env, lgh);
980 out_put:
981         llog_ctxt_put(ctxt);
982         RETURN(rc);
983 }