Whamcloud - gitweb
LU-10308 misc: update Intel copyright messages for 2017
[fs/lustre-release.git] / lustre / lod / lod_sub_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2015, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lod/lod_sub_object.c
27  *
28  * LOD sub object methods
29  *
30  * This file implements sub-object methods for LOD.
31  *
32  * LOD is Logic volume layer in the MDS stack, which will handle striping
33  * and distribute the update to different OSP/OSD. After directing the updates
34  * to one specific OSD/OSP, it also needs to do some thing before calling
35  * OSD/OSP API, for example recording updates for cross-MDT operation, get
36  * the next level transaction etc.
37  *
38  * Author: Di Wang <di.wang@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MDS
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <uapi/linux/lustre/lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <lustre_fid.h>
50 #include <uapi/linux/lustre/lustre_param.h>
51 #include <md_object.h>
52 #include <lustre_linkea.h>
53 #include <lustre_log.h>
54
55 #include "lod_internal.h"
56
57 struct thandle *lod_sub_get_thandle(const struct lu_env *env,
58                                     struct thandle *th,
59                                     const struct dt_object *sub_obj,
60                                     bool *record_update)
61 {
62         struct lod_device       *lod = dt2lod_dev(th->th_dev);
63         struct top_thandle      *tth;
64         struct thandle          *sub_th;
65         int                     type = LU_SEQ_RANGE_ANY;
66         __u32                   mdt_index;
67         int                     rc;
68         ENTRY;
69
70         if (record_update != NULL)
71                 *record_update = false;
72
73         if (th->th_top == NULL)
74                 RETURN(th);
75
76         tth = container_of(th, struct top_thandle, tt_super);
77
78         /* local object must be mdt object, Note: during ost object
79          * creation, FID is not assigned until osp_create(),
80          * so if the FID of sub_obj is zero, it means OST object. */
81         if (!dt_object_remote(sub_obj) ||
82             fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
83                 /* local MDT object */
84                 if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
85                     tth->tt_multiple_thandle != NULL &&
86                     record_update != NULL &&
87                     th->th_result == 0)
88                         *record_update = true;
89
90                 RETURN(tth->tt_master_sub_thandle);
91         }
92
93         rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
94                             &mdt_index, &type);
95         if (rc < 0)
96                 RETURN(ERR_PTR(rc));
97
98         /* th_complex means we need track all of updates for this
99          * transaction, include changes on OST */
100         if (type == LU_SEQ_RANGE_OST && !th->th_complex)
101                 RETURN(tth->tt_master_sub_thandle);
102
103         sub_th = thandle_get_sub(env, th, sub_obj);
104         if (IS_ERR(sub_th))
105                 RETURN(sub_th);
106
107         if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
108             th->th_result == 0)
109                 *record_update = true;
110
111         RETURN(sub_th);
112 }
113
114 /**
115  * Declare sub-object creation.
116  *
117  * Get transaction of next layer and declare the creation of the object.
118  *
119  * \param[in] env       execution environment
120  * \param[in] dt        the object being created
121  * \param[in] attr      the attributes of the object being created
122  * \param[in] hint      the hint of the creation
123  * \param[in] dof       the object format of the creation
124  * \param[th] th        the transaction handle
125  *
126  * \retval              0 if the declaration succeeds
127  * \retval              negative errno if the declaration fails.
128  */
129 int lod_sub_declare_create(const struct lu_env *env, struct dt_object *dt,
130                            struct lu_attr *attr,
131                            struct dt_allocation_hint *hint,
132                            struct dt_object_format *dof, struct thandle *th)
133 {
134         struct thandle *sub_th;
135         bool record_update;
136
137         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
138         if (IS_ERR(sub_th))
139                 return PTR_ERR(sub_th);
140
141         if (record_update)
142                 update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
143                                    attr, hint, dof);
144
145         return dt_declare_create(env, dt, attr, hint, dof, sub_th);
146 }
147
148 /**
149  * Create sub-object.
150  *
151  * Get transaction of next layer, record updates if it belongs to cross-MDT
152  * operation, and create the object.
153  *
154  * \param[in] env       execution environment
155  * \param[in] dt        the object being created
156  * \param[in] attr      the attributes of the object being created
157  * \param[in] hint      the hint of the creation
158  * \param[in] dof       the object format of the creation
159  * \param[th] th        the transaction handle
160  *
161  * \retval              0 if the creation succeeds
162  * \retval              negative errno if the creation fails.
163  */
164 int lod_sub_create(const struct lu_env *env, struct dt_object *dt,
165                    struct lu_attr *attr, struct dt_allocation_hint *hint,
166                    struct dt_object_format *dof, struct thandle *th)
167 {
168         struct thandle     *sub_th;
169         bool               record_update;
170         int                 rc;
171         ENTRY;
172
173         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
174         if (IS_ERR(sub_th))
175                 RETURN(PTR_ERR(sub_th));
176
177         if (record_update) {
178                 rc = update_record_pack(create, th,
179                                         lu_object_fid(&dt->do_lu),
180                                         attr, hint, dof);
181                 if (rc < 0)
182                         RETURN(rc);
183         }
184
185         rc = dt_create(env, dt, attr, hint, dof, sub_th);
186
187         RETURN(rc);
188 }
189
190 /**
191  * Declare adding reference for the sub-object
192  *
193  * Get transaction of next layer and declare the reference adding.
194  *
195  * \param[in] env       execution environment
196  * \param[in] dt        dt object to add reference
197  * \param[in] th        transaction handle
198  *
199  * \retval              0 if the declaration succeeds.
200  * \retval              negative errno if the declaration fails.
201  */
202 int lod_sub_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
203                             struct thandle *th)
204 {
205         struct thandle  *sub_th;
206         bool            record_update;
207         int             rc;
208         ENTRY;
209
210         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
211         if (IS_ERR(sub_th))
212                 RETURN(PTR_ERR(sub_th));
213
214         if (record_update)
215                 update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
216
217         rc = dt_declare_ref_add(env, dt, sub_th);
218
219         RETURN(rc);
220 }
221
222 /**
223  * Add reference for the sub-object
224  *
225  * Get transaction of next layer, record updates if it belongs to cross-MDT
226  * operation and add reference of the object.
227  *
228  * \param[in] env       execution environment
229  * \param[in] dt        dt object to add reference
230  * \param[in] th        transaction handle
231  *
232  * \retval              0 if it succeeds.
233  * \retval              negative errno if it fails.
234  */
235 int lod_sub_ref_add(const struct lu_env *env, struct dt_object *dt,
236                     struct thandle *th)
237 {
238         struct thandle  *sub_th;
239         bool            record_update;
240         int             rc;
241         ENTRY;
242
243         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
244         if (IS_ERR(sub_th))
245                 RETURN(PTR_ERR(sub_th));
246
247         if (record_update) {
248                 rc = update_record_pack(ref_add, th,
249                                         lu_object_fid(&dt->do_lu));
250                 if (rc < 0)
251                         RETURN(rc);
252         }
253
254         rc = dt_ref_add(env, dt, sub_th);
255
256         RETURN(rc);
257 }
258
259 /**
260  * Declare deleting reference for the sub-object
261  *
262  * Get transaction of next layer and declare the reference deleting.
263  *
264  * \param[in] env       execution environment
265  * \param[in] dt        dt object to delete reference
266  * \param[in] th        transaction handle
267  *
268  * \retval              0 if the declaration succeeds.
269  * \retval              negative errno if the declaration fails.
270  */
271 int lod_sub_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
272                             struct thandle *th)
273 {
274         struct thandle  *sub_th;
275         bool            record_update;
276         int             rc;
277         ENTRY;
278
279         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
280         if (IS_ERR(sub_th))
281                 RETURN(PTR_ERR(sub_th));
282
283         if (record_update)
284                 update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
285
286         rc = dt_declare_ref_del(env, dt, sub_th);
287
288         RETURN(rc);
289 }
290
291 /**
292  * Delete reference for the sub-object
293  *
294  * Get transaction of next layer, record updates if it belongs to cross-MDT
295  * operation and delete reference of the object.
296  *
297  * \param[in] env       execution environment
298  * \param[in] dt        dt object to delete reference
299  * \param[in] th        transaction handle
300  *
301  * \retval              0 if it succeeds.
302  * \retval              negative errno if it fails.
303  */
304 int lod_sub_ref_del(const struct lu_env *env, struct dt_object *dt,
305                     struct thandle *th)
306 {
307         struct thandle  *sub_th;
308         bool            record_update;
309         int             rc;
310         ENTRY;
311
312         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
313         if (IS_ERR(sub_th))
314                 RETURN(PTR_ERR(sub_th));
315
316         if (record_update) {
317                 rc = update_record_pack(ref_del, th,
318                                         lu_object_fid(&dt->do_lu));
319                 if (rc < 0)
320                         RETURN(rc);
321         }
322
323         rc = dt_ref_del(env, dt, sub_th);
324
325         RETURN(rc);
326 }
327
328 /**
329  * Declare destroying sub-object
330  *
331  * Get transaction of next layer and declare the sub-object destroy.
332  *
333  * \param[in] env       execution environment
334  * \param[in] dt        dt object to be destroyed
335  * \param[in] th        transaction handle
336  *
337  * \retval              0 if the declaration succeeds.
338  * \retval              negative errno if the declaration fails.
339  */
340 int lod_sub_declare_destroy(const struct lu_env *env, struct dt_object *dt,
341                             struct thandle *th)
342 {
343         struct thandle  *sub_th;
344         bool            record_update;
345         int             rc;
346         ENTRY;
347
348         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
349         if (IS_ERR(sub_th))
350                 RETURN(PTR_ERR(sub_th));
351
352         if (record_update)
353                 update_record_size(env, destroy, th, lu_object_fid(&dt->do_lu));
354
355         rc = dt_declare_destroy(env, dt, sub_th);
356
357         RETURN(rc);
358 }
359
360 /**
361  * Destroy sub-object
362  *
363  * Get transaction of next layer, record updates if it belongs to cross-MDT
364  * operation and destroy the object.
365  *
366  * \param[in] env       execution environment
367  * \param[in] dt        dt object to be destroyed
368  * \param[in] th        transaction handle
369  *
370  * \retval              0 if the destroy succeeds.
371  * \retval              negative errno if the destroy fails.
372  */
373 int lod_sub_destroy(const struct lu_env *env, struct dt_object *dt,
374                     struct thandle *th)
375 {
376         struct thandle  *sub_th;
377         bool            record_update;
378         int             rc;
379         ENTRY;
380
381         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
382         if (IS_ERR(sub_th))
383                 RETURN(PTR_ERR(sub_th));
384
385         if (record_update) {
386                 rc = update_record_pack(destroy, th, lu_object_fid(&dt->do_lu));
387                 if (rc < 0)
388                         RETURN(rc);
389         }
390
391         rc = dt_destroy(env, dt, sub_th);
392
393         RETURN(rc);
394 }
395
396 /**
397  * Declare sub-object index insert
398  *
399  * Get transaction of next layer and declare index insert.
400  *
401  * \param[in] env       execution environment
402  * \param[in] dt        object for which to insert index
403  * \param[in] rec       record of the index which will be inserted
404  * \param[in] key       key of the index which will be inserted
405  * \param[in] th        the transaction handle
406  *
407  * \retval              0 if the declaration succeeds.
408  * \retval              negative errno if the declaration fails.
409  */
410 int lod_sub_declare_insert(const struct lu_env *env, struct dt_object *dt,
411                            const struct dt_rec *rec,
412                            const struct dt_key *key, struct thandle *th)
413 {
414         struct thandle *sub_th;
415         bool            record_update;
416
417         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
418         if (IS_ERR(sub_th))
419                 return PTR_ERR(sub_th);
420
421         if (record_update)
422                 update_record_size(env, index_insert, th,
423                                    lu_object_fid(&dt->do_lu), rec, key);
424
425         return dt_declare_insert(env, dt, rec, key, sub_th);
426 }
427
428 /**
429  * Insert index of sub object
430  *
431  * Get transaction of next layer, record updates if it belongs to cross-MDT
432  * operation, and insert the index.
433  *
434  * \param[in] env       execution environment
435  * \param[in] dt        object for which to insert index
436  * \param[in] rec       record of the index to be inserted
437  * \param[in] key       key of the index to be inserted
438  * \param[in] th        the transaction handle
439  * \param[in] ign       whether ignore quota
440  *
441  * \retval              0 if the insertion succeeds.
442  * \retval              negative errno if the insertion fails.
443  */
444 int lod_sub_insert(const struct lu_env *env, struct dt_object *dt,
445                    const struct dt_rec *rec, const struct dt_key *key,
446                    struct thandle *th, int ign)
447 {
448         struct thandle *sub_th;
449         int             rc;
450         bool            record_update;
451
452         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
453         if (IS_ERR(sub_th))
454                 return PTR_ERR(sub_th);
455
456         if (record_update) {
457                 rc = update_record_pack(index_insert, th,
458                                         lu_object_fid(&dt->do_lu), rec, key);
459                 if (rc < 0)
460                         return rc;
461         }
462
463         return dt_insert(env, dt, rec, key, sub_th, ign);
464 }
465
466 /**
467  * Declare sub-object index delete
468  *
469  * Get transaction of next layer and declare index deletion.
470  *
471  * \param[in] env       execution environment
472  * \param[in] dt        object for which to delete index
473  * \param[in] key       key of the index which will be deleted
474  * \param[in] th        the transaction handle
475  *
476  * \retval              0 if the declaration succeeds.
477  * \retval              negative errno if the declaration fails.
478  */
479 int lod_sub_declare_delete(const struct lu_env *env, struct dt_object *dt,
480                            const struct dt_key *key, struct thandle *th)
481 {
482         struct thandle *sub_th;
483         bool            record_update;
484
485         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
486         if (IS_ERR(sub_th))
487                 return PTR_ERR(sub_th);
488
489         if (record_update)
490                 update_record_size(env, index_delete, th,
491                                    lu_object_fid(&dt->do_lu), key);
492
493         return dt_declare_delete(env, dt, key, sub_th);
494 }
495
496 /**
497  * Delete index of sub object
498  *
499  * Get transaction of next layer, record updates if it belongs to cross-MDT
500  * operation, and delete the index.
501  *
502  * \param[in] env       execution environment
503  * \param[in] dt        object for which to delete index
504  * \param[in] key       key of the index to be deleted
505  * \param[in] th        the transaction handle
506  *
507  * \retval              0 if the deletion succeeds.
508  * \retval              negative errno if the deletion fails.
509  */
510 int lod_sub_delete(const struct lu_env *env, struct dt_object *dt,
511                    const struct dt_key *name, struct thandle *th)
512 {
513         struct thandle  *sub_th;
514         bool            record_update;
515         int             rc;
516         ENTRY;
517
518         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
519         if (IS_ERR(sub_th))
520                 RETURN(PTR_ERR(sub_th));
521
522         if (record_update) {
523                 rc = update_record_pack(index_delete, th,
524                                         lu_object_fid(&dt->do_lu), name);
525                 if (rc < 0)
526                         RETURN(rc);
527         }
528
529         rc = dt_delete(env, dt, name, sub_th);
530         RETURN(rc);
531 }
532
533 /**
534  * Declare xattr_set
535  *
536  * Get transaction of next layer, and declare xattr set.
537  *
538  * \param[in] env       execution environment
539  * \param[in] dt        object on which to set xattr
540  * \param[in] buf       xattr to be set
541  * \param[in] name      name of the xattr
542  * \param[in] fl        flag for setting xattr
543  *
544  * \retval              0 if the declaration succeeds.
545  * \retval              negative errno if the declaration fails.
546  */
547 int lod_sub_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
548                               const struct lu_buf *buf, const char *name,
549                               int fl, struct thandle *th)
550 {
551         struct thandle  *sub_th;
552         bool            record_update;
553         int             rc;
554         ENTRY;
555
556         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
557         if (IS_ERR(sub_th))
558                 RETURN(PTR_ERR(sub_th));
559
560         if (record_update)
561                 update_record_size(env, xattr_set, th,
562                                    lu_object_fid(&dt->do_lu),
563                                    buf, name, fl);
564
565         rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
566
567         RETURN(rc);
568 }
569
570 /**
571  * Set xattr
572  *
573  * Get transaction of next layer, record updates if it belongs to cross-MDT
574  * operation, and set xattr to the object.
575  *
576  * \param[in] env       execution environment
577  * \param[in] dt        object on which to set xattr
578  * \param[in] buf       xattr to be set
579  * \param[in] name      name of the xattr
580  * \param[in] fl        flag for setting xattr
581  * \param[in] th        transaction handle
582  *
583  * \retval              0 if the xattr setting succeeds.
584  * \retval              negative errno if xattr setting fails.
585  */
586 int lod_sub_xattr_set(const struct lu_env *env, struct dt_object *dt,
587                       const struct lu_buf *buf, const char *name, int fl,
588                       struct thandle *th)
589 {
590         struct thandle  *sub_th;
591         bool            record_update;
592         int             rc;
593         ENTRY;
594
595         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
596         if (IS_ERR(sub_th))
597                 RETURN(PTR_ERR(sub_th));
598
599         if (record_update) {
600                 rc = update_record_pack(xattr_set, th,
601                                         lu_object_fid(&dt->do_lu),
602                                         buf, name, fl);
603                 if (rc < 0)
604                         RETURN(rc);
605         }
606
607         rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
608
609         RETURN(rc);
610 }
611
612 /**
613  * Declare attr_set
614  *
615  * Get transaction of next layer, and declare attr set.
616  *
617  * \param[in] env       execution environment
618  * \param[in] dt        object on which to set attr
619  * \param[in] attr      attributes to be set
620  * \param[in] th        transaction handle
621  *
622  * \retval              0 if the declaration succeeds.
623  * \retval              negative errno if the declaration fails.
624  */
625 int lod_sub_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
626                              const struct lu_attr *attr, struct thandle *th)
627 {
628         struct thandle  *sub_th;
629         bool            record_update;
630         int             rc;
631         ENTRY;
632
633         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
634         if (IS_ERR(sub_th))
635                 RETURN(PTR_ERR(sub_th));
636
637         if (record_update)
638                 update_record_size(env, attr_set, th,
639                                    lu_object_fid(&dt->do_lu), attr);
640
641         rc = dt_declare_attr_set(env, dt, attr, sub_th);
642
643         RETURN(rc);
644 }
645
646 /**
647  * attributes set
648  *
649  * Get transaction of next layer, record updates if it belongs to cross-MDT
650  * operation, and set attributes to the object.
651  *
652  * \param[in] env       execution environment
653  * \param[in] dt        object on which to set attr
654  * \param[in] attr      attrbutes to be set
655  * \param[in] th        transaction handle
656  *
657  * \retval              0 if attributes setting succeeds.
658  * \retval              negative errno if the attributes setting fails.
659  */
660 int lod_sub_attr_set(const struct lu_env *env, struct dt_object *dt,
661                      const struct lu_attr *attr, struct thandle *th)
662 {
663         bool               record_update;
664         struct thandle     *sub_th;
665         int                 rc;
666         ENTRY;
667
668         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
669         if (IS_ERR(sub_th))
670                 RETURN(PTR_ERR(sub_th));
671
672         if (record_update) {
673                 rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu),
674                                         attr);
675                 if (rc < 0)
676                         RETURN(rc);
677         }
678
679         rc = dt_attr_set(env, dt, attr, sub_th);
680
681         RETURN(rc);
682 }
683
684 /**
685  * Declare xattr_del
686  *
687  * Get transaction of next layer, and declare xattr deletion.
688  *
689  * \param[in] env       execution environment
690  * \param[in] dt        object on which to delete xattr
691  * \param[in] name      name of the xattr to be deleted
692  * \param[in] th        transaction handle
693  *
694  * \retval              0 if the declaration succeeds.
695  * \retval              negative errno if the declaration fails.
696  */
697 int lod_sub_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
698                               const char *name, struct thandle *th)
699 {
700         struct thandle  *sub_th;
701         bool            record_update;
702         int             rc;
703         ENTRY;
704
705         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
706         if (IS_ERR(sub_th))
707                 RETURN(PTR_ERR(sub_th));
708
709         if (record_update)
710                 update_record_size(env, xattr_del, th,
711                                    lu_object_fid(&dt->do_lu),
712                                    name);
713
714         rc = dt_declare_xattr_del(env, dt, name, sub_th);
715
716         RETURN(rc);
717 }
718
719 /**
720  * xattribute deletion
721  *
722  * Get transaction of next layer, record update if it belongs to cross-MDT
723  * operation and delete xattr.
724  *
725  * \param[in] env       execution environment
726  * \param[in] dt        object on which to delete xattr
727  * \param[in] name      name of the xattr to be deleted
728  * \param[in] th        transaction handle
729  *
730  * \retval              0 if the deletion succeeds.
731  * \retval              negative errno if the deletion fails.
732  */
733 int lod_sub_xattr_del(const struct lu_env *env, struct dt_object *dt,
734                       const char *name, struct thandle *th)
735 {
736         struct thandle  *sub_th;
737         bool            record_update;
738         int             rc;
739         ENTRY;
740
741         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
742         if (IS_ERR(sub_th))
743                 RETURN(PTR_ERR(sub_th));
744
745         if (record_update) {
746                 rc = update_record_pack(xattr_del, th,
747                                         lu_object_fid(&dt->do_lu), name);
748                 if (rc < 0)
749                         RETURN(rc);
750         }
751
752         rc = dt_xattr_del(env, dt, name, sub_th);
753
754         RETURN(rc);
755 }
756
757 /**
758  * Declare buffer write
759  *
760  * Get transaction of next layer and declare buffer write.
761  *
762  * \param[in] env       execution environment
763  * \param[in] dt        object to be written
764  * \param[in] buf       buffer to write which includes an embedded size field
765  * \param[in] pos       offet in the object to start writing at
766  * \param[in] th        transaction handle
767  *
768  * \retval              0 if the insertion succeeds.
769  * \retval              negative errno if the insertion fails.
770  */
771 int lod_sub_declare_write(const struct lu_env *env, struct dt_object *dt,
772                           const struct lu_buf *buf, loff_t pos,
773                           struct thandle *th)
774 {
775         struct thandle  *sub_th;
776         bool            record_update;
777         int             rc;
778         ENTRY;
779
780         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
781         if (IS_ERR(sub_th))
782                 RETURN(PTR_ERR(sub_th));
783
784         if (record_update)
785                 update_record_size(env, write, th,
786                                    lu_object_fid(&dt->do_lu),
787                                    buf, pos);
788
789         rc = dt_declare_write(env, dt, buf, pos, sub_th);
790
791         RETURN(rc);
792 }
793
794 /**
795  * Write buffer to sub object
796  *
797  * Get transaction of next layer, records buffer write if it belongs to
798  * Cross-MDT operation, and write buffer.
799  *
800  * \param[in] env       execution environment
801  * \param[in] dt        object to be written
802  * \param[in] buf       buffer to write which includes an embedded size field
803  * \param[in] pos       offet in the object to start writing at
804  * \param[in] th        transaction handle
805  * \param[in] rq        enforcement for this write
806  *
807  * \retval              the buffer size in bytes if it succeeds.
808  * \retval              negative errno if it fails.
809  */
810 ssize_t lod_sub_write(const struct lu_env *env, struct dt_object *dt,
811                       const struct lu_buf *buf, loff_t *pos,
812                       struct thandle *th, int rq)
813 {
814         struct thandle  *sub_th;
815         bool            record_update;
816         ssize_t         rc;
817         ENTRY;
818
819         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
820         if (IS_ERR(sub_th))
821                 RETURN(PTR_ERR(sub_th));
822
823         if (record_update) {
824                 rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu),
825                                         buf, *pos);
826                 if (rc < 0)
827                         RETURN(rc);
828         }
829
830         rc = dt_write(env, dt, buf, pos, sub_th, rq);
831         RETURN(rc);
832 }
833
834 /**
835  * Declare punch
836  *
837  * Get transaction of next layer and declare punch.
838  *
839  * \param[in] env       execution environment
840  * \param[in] dt        object to be written
841  * \param[in] start     start offset of punch
842  * \param[in] end       end offet of punch
843  * \param[in] th        transaction handle
844  *
845  * \retval              0 if the insertion succeeds.
846  * \retval              negative errno if the insertion fails.
847  */
848 int lod_sub_declare_punch(const struct lu_env *env, struct dt_object *dt,
849                           __u64 start, __u64 end, struct thandle *th)
850 {
851         struct thandle  *sub_th;
852         bool            record_update;
853         int             rc;
854         ENTRY;
855
856         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
857         if (IS_ERR(sub_th))
858                 RETURN(PTR_ERR(sub_th));
859
860         if (record_update)
861                 update_record_size(env, punch, th,
862                                    lu_object_fid(&dt->do_lu),
863                                    start, end);
864
865         rc = dt_declare_punch(env, dt, start, end, sub_th);
866
867         RETURN(rc);
868 }
869
870 /**
871  * Punch to sub object
872  *
873  * Get transaction of next layer, records buffer write if it belongs to
874  * Cross-MDT operation, and punch object.
875  *
876  * \param[in] env       execution environment
877  * \param[in] dt        object to be written
878  * \param[in] start     start offset of punch
879  * \param[in] end       end offset of punch
880  * \param[in] th        transaction handle
881  * \param[in] capa      capability of the write
882  *
883  * \retval              the buffer size in bytes if it succeeds.
884  * \retval              negative errno if it fails.
885  */
886 int lod_sub_punch(const struct lu_env *env, struct dt_object *dt,
887                   __u64 start, __u64 end, struct thandle *th)
888 {
889         struct thandle  *sub_th;
890         bool            record_update;
891         int             rc;
892         ENTRY;
893
894         sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
895         if (IS_ERR(sub_th))
896                 RETURN(PTR_ERR(sub_th));
897
898         if (record_update) {
899                 rc = update_record_pack(punch, th, lu_object_fid(&dt->do_lu),
900                                         start, end);
901                 if (rc < 0)
902                         RETURN(rc);
903         }
904
905         rc = dt_punch(env, dt, start, end, sub_th);
906
907         RETURN(rc);
908 }
909
910 int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
911                       struct dt_device *dt, int index)
912 {
913         struct lod_thread_info  *lti = lod_env_info(env);
914         struct llog_ctxt        *ctxt;
915         struct llog_handle      *lgh;
916         struct llog_catid       *cid = &lti->lti_cid;
917         struct lu_fid           *fid = &lti->lti_fid;
918         struct obd_device       *obd;
919         int                     rc;
920         bool                    need_put = false;
921         ENTRY;
922
923         lu_update_log_fid(fid, index);
924
925         rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
926         if (rc < 0)
927                 RETURN(rc);
928
929         rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid);
930         if (rc != 0) {
931                 CERROR("%s: can't get id from catalogs: rc = %d\n",
932                        lod2obd(lod)->obd_name, rc);
933                 RETURN(rc);
934         }
935
936         obd = dt->dd_lu_dev.ld_obd;
937         ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
938         LASSERT(ctxt != NULL);
939         ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
940         ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
941         if (likely(logid_id(&cid->lci_logid) != 0)) {
942                 rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
943                                LLOG_OPEN_EXISTS);
944
945                 /* re-create llog if it is missing */
946                 if (rc == -ENOENT)
947                         logid_set_id(&cid->lci_logid, 0);
948                 else if (rc < 0)
949                         GOTO(out_put, rc);
950         }
951
952         if (unlikely(logid_id(&cid->lci_logid) == 0)) {
953                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
954                 if (rc < 0)
955                         GOTO(out_put, rc);
956                 cid->lci_logid = lgh->lgh_id;
957                 need_put = true;
958         }
959
960         LASSERT(lgh != NULL);
961
962         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
963         if (rc != 0)
964                 GOTO(out_close, rc);
965
966         if (need_put) {
967                 rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
968                 if (rc != 0)
969                         GOTO(out_close, rc);
970         }
971
972         ctxt->loc_handle = lgh;
973
974         CDEBUG(D_INFO, "%s: init llog for index %d - catid "DFID":%x\n",
975                obd->obd_name, index, PFID(&cid->lci_logid.lgl_oi.oi_fid),
976                cid->lci_logid.lgl_ogen);
977 out_close:
978         if (rc != 0)
979                 llog_cat_close(env, lgh);
980 out_put:
981         llog_ctxt_put(ctxt);
982         RETURN(rc);
983 }