Whamcloud - gitweb
1b89ff6553f2f55c6fb72e1ddf34b2af2cb966a7
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_obdo.h>
36 #include <lustre_update.h>
37 #include <md_object.h>
38 #include <obd.h>
39 #include <obd_class.h>
40
41 #include "tgt_internal.h"
42
43 const char *update_op_str(__u16 opc)
44 {
45         static const char *opc_str[] = {
46                 [OUT_START] = "start",
47                 [OUT_CREATE] = "create",
48                 [OUT_DESTROY] = "destroy",
49                 [OUT_REF_ADD] = "ref_add",
50                 [OUT_REF_DEL] = "ref_del" ,
51                 [OUT_ATTR_SET] = "attr_set",
52                 [OUT_ATTR_GET] = "attr_get",
53                 [OUT_XATTR_SET] = "xattr_set",
54                 [OUT_XATTR_GET] = "xattr_get",
55                 [OUT_INDEX_LOOKUP] = "lookup",
56                 [OUT_INDEX_INSERT] = "insert",
57                 [OUT_INDEX_DELETE] = "delete",
58                 [OUT_WRITE] = "write",
59                 [OUT_XATTR_DEL] = "xattr_del",
60                 [OUT_PUNCH] = "punch",
61                 [OUT_READ] = "read",
62                 [OUT_NOOP] = "noop",
63         };
64
65         if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
66                 return opc_str[opc];
67         else
68                 return "unknown";
69 }
70 EXPORT_SYMBOL(update_op_str);
71
72 /**
73  * Fill object update header
74  *
75  * Only fill the object update header, and parameters will be filled later
76  * in other functions.
77  *
78  * \params[in] env              execution environment
79  * \params[in] update           object update to be filled
80  * \params[in,out] max_update_size      maximum object update size, if the
81  *                                      current update length equals or
82  *                                      exceeds the size, it will return -E2BIG.
83  * \params[in] update_op        update type
84  * \params[in] fid              object FID of the update
85  * \params[in] param_count      the count of the update parameters
86  * \params[in] param_sizes      the length of each parameters
87  *
88  * \retval                      0 if packing succeeds.
89  * \retval                      -E2BIG if packing exceeds the maximum length.
90  */
91 int out_update_header_pack(const struct lu_env *env,
92                            struct object_update *update,
93                            size_t *max_update_size,
94                            enum update_type update_op,
95                            const struct lu_fid *fid,
96                            unsigned int param_count,
97                            __u16 *param_sizes,
98                            __u32 reply_size)
99 {
100         struct object_update_param      *param;
101         unsigned int                    i;
102         size_t                          update_size;
103
104         if (((reply_size + 7) >> 3) >= 1ULL << 16)
105                 return -EINVAL;
106
107         /* Check whether the packing exceeding the maxima update length */
108         update_size = sizeof(*update);
109         for (i = 0; i < param_count; i++)
110                 update_size += cfs_size_round(sizeof(*param) + param_sizes[i]);
111
112         if (unlikely(update_size >= *max_update_size)) {
113                 *max_update_size = update_size;
114                 return -E2BIG;
115         }
116
117         update->ou_fid = *fid;
118         update->ou_type = update_op;
119         update->ou_params_count = param_count;
120         update->ou_result_size = reply_size;
121         param = &update->ou_params[0];
122         for (i = 0; i < param_count; i++) {
123                 param->oup_len = param_sizes[i];
124                 param = (struct object_update_param *)((char *)param +
125                          object_update_param_size(param));
126         }
127
128         return 0;
129 }
130
131 /**
132  * Packs one update into the update_buffer.
133  *
134  * \param[in] env       execution environment
135  * \param[in] update    update to be packed
136  * \param[in] max_update_size   *maximum size of \a update
137  * \param[in] op        update operation (enum update_type)
138  * \param[in] fid       object FID for this update
139  * \param[in] param_count       number of parameters for this update
140  * \param[in] param_sizes       array of parameters length of this update
141  * \param[in] param_bufs        parameter buffers
142  *
143  * \retval              = 0 if updates packing succeeds
144  * \retval              negative errno if updates packing fails
145  **/
146 int out_update_pack(const struct lu_env *env, struct object_update *update,
147                     size_t *max_update_size, enum update_type op,
148                     const struct lu_fid *fid, unsigned int param_count,
149                     __u16 *param_sizes, const void **param_bufs,
150                     __u32 reply_size)
151 {
152         struct object_update_param      *param;
153         unsigned int                    i;
154         int                             rc;
155         ENTRY;
156
157         rc = out_update_header_pack(env, update, max_update_size, op, fid,
158                                     param_count, param_sizes, reply_size);
159         if (rc != 0)
160                 RETURN(rc);
161
162         param = &update->ou_params[0];
163         for (i = 0; i < param_count; i++) {
164                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
165                 param = (struct object_update_param *)((char *)param +
166                          object_update_param_size(param));
167         }
168
169         RETURN(0);
170 }
171 EXPORT_SYMBOL(out_update_pack);
172
173 /**
174  * Pack various updates into the update_buffer.
175  *
176  * The following functions pack different updates into the update_buffer
177  * So parameters of these API is basically same as its correspondent OSD/OSP
178  * API, for detail description of these parameters see osd_handler.c or
179  * osp_md_object.c.
180  *
181  * \param[in] env       execution environment
182  * \param[in] ubuf      update buffer
183  * \param[in] fid       fid of this object for the update
184  *
185  * \retval              0 if insertion succeeds.
186  * \retval              negative errno if insertion fails.
187  */
188 int out_create_pack(const struct lu_env *env, struct object_update *update,
189                     size_t *max_update_size, const struct lu_fid *fid,
190                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
191                     struct dt_object_format *dof)
192 {
193         struct obdo             *obdo;
194         __u16                   sizes[2] = {sizeof(*obdo), 0};
195         int                     buf_count = 1;
196         const struct lu_fid     *parent_fid = NULL;
197         int                     rc;
198         ENTRY;
199
200         if (hint != NULL && hint->dah_parent) {
201                 parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
202                 sizes[1] = sizeof(*parent_fid);
203                 buf_count++;
204         }
205
206         rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
207                                     fid, buf_count, sizes, 0);
208         if (rc != 0)
209                 RETURN(rc);
210
211         obdo = object_update_param_get(update, 0, NULL);
212         if (IS_ERR(obdo))
213                 RETURN(PTR_ERR(obdo));
214
215         obdo->o_valid = 0;
216         obdo_from_la(obdo, attr, attr->la_valid);
217         lustre_set_wire_obdo(NULL, obdo, obdo);
218
219         if (parent_fid != NULL) {
220                 struct lu_fid *tmp;
221
222                 tmp = object_update_param_get(update, 1, NULL);
223                 if (IS_ERR(tmp))
224                         RETURN(PTR_ERR(tmp));
225
226                 fid_cpu_to_le(tmp, parent_fid);
227         }
228
229         RETURN(0);
230 }
231 EXPORT_SYMBOL(out_create_pack);
232
233 int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
234                      size_t *max_update_size, const struct lu_fid *fid)
235 {
236         return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
237                                0, NULL, NULL, 0);
238 }
239 EXPORT_SYMBOL(out_ref_del_pack);
240
241 int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
242                      size_t *max_update_size, const struct lu_fid *fid)
243 {
244         return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
245                                0, NULL, NULL, 0);
246 }
247 EXPORT_SYMBOL(out_ref_add_pack);
248
249 int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
250                       size_t *max_update_size, const struct lu_fid *fid,
251                       const struct lu_attr *attr)
252 {
253         struct obdo             *obdo;
254         __u16                   size = sizeof(*obdo);
255         int                     rc;
256         ENTRY;
257
258         rc = out_update_header_pack(env, update, max_update_size,
259                                     OUT_ATTR_SET, fid, 1, &size, 0);
260         if (rc != 0)
261                 RETURN(rc);
262
263         obdo = object_update_param_get(update, 0, NULL);
264         if (IS_ERR(obdo))
265                 RETURN(PTR_ERR(obdo));
266
267         obdo->o_valid = 0;
268         obdo_from_la(obdo, attr, attr->la_valid);
269         lustre_set_wire_obdo(NULL, obdo, obdo);
270
271         RETURN(0);
272 }
273 EXPORT_SYMBOL(out_attr_set_pack);
274
275 int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
276                        size_t *max_update_size, const struct lu_fid *fid,
277                        const struct lu_buf *buf, const char *name, __u32 flag)
278 {
279         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
280         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
281                                (char *)&flag};
282
283         return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
284                                fid, ARRAY_SIZE(sizes), sizes, bufs, 0);
285 }
286 EXPORT_SYMBOL(out_xattr_set_pack);
287
288 int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
289                        size_t *max_update_size, const struct lu_fid *fid,
290                        const char *name)
291 {
292         __u16   size = strlen(name) + 1;
293
294         return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
295                                fid, 1, &size, (const void **)&name, 0);
296 }
297 EXPORT_SYMBOL(out_xattr_del_pack);
298
299 int out_index_insert_pack(const struct lu_env *env,
300                           struct object_update *update,
301                           size_t *max_update_size, const struct lu_fid *fid,
302                           const struct dt_rec *rec, const struct dt_key *key)
303 {
304         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
305         struct lu_fid              rec_fid;
306         __u32                       type = cpu_to_le32(rec1->rec_type);
307         __u16                       sizes[3] = { strlen((char *)key) + 1,
308                                                 sizeof(rec_fid),
309                                                 sizeof(type) };
310         const void                 *bufs[3] = { (char *)key,
311                                                 (char *)&rec_fid,
312                                                 (char *)&type };
313
314         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
315
316         return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
317                                fid, ARRAY_SIZE(sizes), sizes, bufs, 0);
318 }
319 EXPORT_SYMBOL(out_index_insert_pack);
320
321 int out_index_delete_pack(const struct lu_env *env,
322                           struct object_update *update,
323                           size_t *max_update_size, const struct lu_fid *fid,
324                           const struct dt_key *key)
325 {
326         __u16   size = strlen((char *)key) + 1;
327         const void *buf = key;
328
329         return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
330                                fid, 1, &size, &buf, 0);
331 }
332 EXPORT_SYMBOL(out_index_delete_pack);
333
334 int out_object_destroy_pack(const struct lu_env *env,
335                             struct object_update *update,
336                             size_t *max_update_size, const struct lu_fid *fid)
337 {
338         return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
339                                0, NULL, NULL, 0);
340 }
341 EXPORT_SYMBOL(out_object_destroy_pack);
342
343 int out_write_pack(const struct lu_env *env, struct object_update *update,
344                    size_t *max_update_size, const struct lu_fid *fid,
345                    const struct lu_buf *buf, __u64 pos)
346 {
347         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
348         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
349         int             rc;
350
351         pos = cpu_to_le64(pos);
352
353         rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
354                              ARRAY_SIZE(sizes), sizes, bufs, 0);
355         return rc;
356 }
357 EXPORT_SYMBOL(out_write_pack);
358
359 /**
360  * Pack various readonly updates into the update_buffer.
361  *
362  * The following update funcs are only used by read-only ops, lookup,
363  * getattr etc, so it does not need transaction here. Currently they
364  * are only used by OSP.
365  *
366  * \param[in] env       execution environment
367  * \param[in] fid       fid of this object for the update
368  * \param[in] ubuf      update buffer
369  *
370  * \retval              = 0 pack succeed.
371  *                      < 0 pack failed.
372  **/
373 int out_index_lookup_pack(const struct lu_env *env,
374                           struct object_update *update,
375                           size_t *max_update_size, const struct lu_fid *fid,
376                           struct dt_rec *rec, const struct dt_key *key)
377 {
378         const void      *name = key;
379         __u16           size = strlen((char *)name) + 1;
380
381         /* XXX: this shouldn't be hardcoded */
382         return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
383                                fid, 1, &size, &name, 256);
384 }
385 EXPORT_SYMBOL(out_index_lookup_pack);
386
387 int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
388                       size_t *max_update_size, const struct lu_fid *fid)
389 {
390         return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
391                                fid, 0, NULL, NULL, sizeof(struct obdo));
392 }
393 EXPORT_SYMBOL(out_attr_get_pack);
394
395 int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
396                        size_t *max_update_size, const struct lu_fid *fid,
397                        const char *name, const int bufsize)
398 {
399         __u16 size;
400
401         LASSERT(name != NULL);
402         size = strlen(name) + 1;
403
404         return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
405                                fid, 1, &size, (const void **)&name, bufsize);
406 }
407 EXPORT_SYMBOL(out_xattr_get_pack);
408
409 int out_read_pack(const struct lu_env *env, struct object_update *update,
410                   size_t *max_update_size, const struct lu_fid *fid,
411                   size_t size, loff_t pos)
412 {
413         __u16           sizes[2] = {sizeof(size), sizeof(pos)};
414         const void      *bufs[2] = {&size, &pos};
415
416         LASSERT(size > 0);
417         size = cpu_to_le64(size);
418         pos = cpu_to_le64(pos);
419
420         return out_update_pack(env, update, max_update_size, OUT_READ, fid,
421                                ARRAY_SIZE(sizes), sizes, bufs, size);
422 }
423 EXPORT_SYMBOL(out_read_pack);
424
425 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
426 {
427         struct tx_arg   **new_ta;
428         int             i;
429         int             rc = 0;
430
431         if (ta->ta_alloc_args >= new_alloc_ta)
432                 return 0;
433
434         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
435         if (new_ta == NULL)
436                 return -ENOMEM;
437
438         for (i = 0; i < new_alloc_ta; i++) {
439                 if (i < ta->ta_alloc_args) {
440                         /* copy the old args to new one */
441                         new_ta[i] = ta->ta_args[i];
442                 } else {
443                         OBD_ALLOC_PTR(new_ta[i]);
444                         if (new_ta[i] == NULL)
445                                 GOTO(out, rc = -ENOMEM);
446                 }
447         }
448
449         /* free the old args */
450         if (ta->ta_args != NULL)
451                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
452                                       ta->ta_alloc_args);
453
454         ta->ta_args = new_ta;
455         ta->ta_alloc_args = new_alloc_ta;
456 out:
457         if (rc != 0) {
458                 for (i = 0; i < new_alloc_ta; i++) {
459                         if (new_ta[i] != NULL)
460                                 OBD_FREE_PTR(new_ta[i]);
461                 }
462                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
463         }
464         return rc;
465 }
466
467 #define TX_ALLOC_STEP   8
468 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
469                            tx_exec_func_t func, tx_exec_func_t undo,
470                            const char *file, int line)
471 {
472         int rc;
473         int i;
474
475         LASSERT(ta != NULL);
476         LASSERT(func != NULL);
477
478         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
479                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
480                 if (rc != 0)
481                         return ERR_PTR(rc);
482         }
483
484         i = ta->ta_argno;
485
486         ta->ta_argno++;
487
488         ta->ta_args[i]->exec_fn = func;
489         ta->ta_args[i]->undo_fn = undo;
490         ta->ta_args[i]->file    = file;
491         ta->ta_args[i]->line    = line;
492
493         return ta->ta_args[i];
494 }
495
496 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
497                            struct thandle *th)
498 {
499         int rc;
500
501         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
502                PFID(lu_object_fid(&dt_obj->do_lu)));
503
504         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
505         rc = dt_destroy(env, dt_obj, th);
506         dt_write_unlock(env, dt_obj);
507
508         return rc;
509 }
510
511 /**
512  * All of the xxx_undo will be used once execution failed,
513  * But because all of the required resource has been reserved in
514  * declare phase, i.e. if declare succeed, it should make sure
515  * the following executing phase succeed in anyway, so these undo
516  * should be useless for most of the time in Phase I
517  */
518 static int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
519                               struct tx_arg *arg)
520 {
521         int rc;
522
523         rc = out_obj_destroy(env, arg->object, th);
524         if (rc != 0)
525                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
526                        dt_obd_name(th->th_dev), rc);
527         return rc;
528 }
529
530 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
531                        struct tx_arg *arg)
532 {
533         struct dt_object        *dt_obj = arg->object;
534         int                      rc;
535
536         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
537                dt_obd_name(th->th_dev),
538                PFID(lu_object_fid(&arg->object->do_lu)),
539                arg->u.create.dof.dof_type,
540                arg->u.create.attr.la_mode & S_IFMT);
541
542         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
543         rc = dt_create(env, dt_obj, &arg->u.create.attr,
544                        &arg->u.create.hint, &arg->u.create.dof, th);
545
546         dt_write_unlock(env, dt_obj);
547
548         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
549                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
550
551         if (arg->reply != NULL)
552                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
553                                             rc);
554
555         return rc;
556 }
557
558 /**
559  * Add create update to thandle
560  *
561  * Declare create updates and add the update to the thandle updates
562  * exec array.
563  *
564  * \param [in] env      execution environment
565  * \param [in] obj      object to be created
566  * \param [in] attr     attributes of the creation
567  * \param [in] parent_fid the fid of the parent
568  * \param [in] dof      dt object format of the creation
569  * \param [in] ta       thandle execuation args where all of updates
570  *                      of the transaction are stored
571  * \param [in] th       thandle for this update
572  * \param [in] reply    reply of the updates
573  * \param [in] index    index of the reply
574  * \param [in] file     the file name where the function is called,
575  *                      which is only for debugging purpose.
576  * \param [in] line     the line number where the funtion is called,
577  *                      which is only for debugging purpose.
578  *
579  * \retval              0 if updates is added successfully.
580  * \retval              negative errno if update adding fails.
581  */
582 int out_create_add_exec(const struct lu_env *env, struct dt_object *obj,
583                         struct lu_attr *attr, struct lu_fid *parent_fid,
584                         struct dt_object_format *dof,
585                         struct thandle_exec_args *ta,
586                         struct thandle  *th,
587                         struct object_update_reply *reply,
588                         int index, const char *file, int line)
589 {
590         struct tx_arg *arg;
591         int rc;
592
593         rc = dt_declare_create(env, obj, attr, NULL, dof, th);
594         if (rc != 0)
595                 return rc;
596
597         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
598                           line);
599         if (IS_ERR(arg))
600                 return PTR_ERR(arg);
601
602         /* release the object in out_trans_stop */
603         lu_object_get(&obj->do_lu);
604         arg->object = obj;
605         arg->u.create.attr = *attr;
606         if (parent_fid != NULL)
607                 arg->u.create.fid = *parent_fid;
608         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
609         arg->u.create.dof  = *dof;
610         arg->reply = reply;
611         arg->index = index;
612
613         return 0;
614 }
615
616 static int out_tx_attr_set_undo(const struct lu_env *env,
617                                 struct thandle *th, struct tx_arg *arg)
618 {
619         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
620                dt_obd_name(th->th_dev),
621                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
622
623         return -ENOTSUPP;
624 }
625
626 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
627                                 struct tx_arg *arg)
628 {
629         struct dt_object        *dt_obj = arg->object;
630         int                     rc;
631
632         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
633                PFID(lu_object_fid(&dt_obj->do_lu)));
634
635         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
636         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th);
637         dt_write_unlock(env, dt_obj);
638
639         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
640                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
641
642         if (arg->reply != NULL)
643                 object_update_result_insert(arg->reply, NULL, 0,
644                                             arg->index, rc);
645
646         return rc;
647 }
648
649 int out_attr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
650                           const struct lu_attr *attr,
651                           struct thandle_exec_args *ta,
652                           struct thandle *th, struct object_update_reply *reply,
653                           int index, const char *file, int line)
654 {
655         struct tx_arg   *arg;
656         int             rc;
657
658         rc = dt_declare_attr_set(env, dt_obj, attr, th);
659         if (rc != 0)
660                 return rc;
661
662         arg = tx_add_exec(ta, out_tx_attr_set_exec, out_tx_attr_set_undo,
663                           file, line);
664         if (IS_ERR(arg))
665                 return PTR_ERR(arg);
666
667         lu_object_get(&dt_obj->do_lu);
668         arg->object = dt_obj;
669         arg->u.attr_set.attr = *attr;
670         arg->reply = reply;
671         arg->index = index;
672         return 0;
673 }
674
675 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
676                              struct tx_arg *arg)
677 {
678         struct dt_object *dt_obj = arg->object;
679         int rc;
680
681         CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
682                PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
683                arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
684
685         if (OBD_FAIL_CHECK(OBD_FAIL_OUT_ENOSPC)) {
686                 rc = -ENOSPC;
687         } else {
688                 dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
689                 rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
690                                      &arg->u.write.pos, th);
691                 dt_write_unlock(env, dt_obj);
692
693                 if (rc == 0)
694                         rc = arg->u.write.buf.lb_len;
695         }
696
697         if (arg->reply != NULL)
698                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
699                                             rc);
700
701         return rc > 0 ? 0 : rc;
702 }
703
704 int out_write_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
705                        const struct lu_buf *buf, loff_t pos,
706                        struct thandle_exec_args *ta, struct thandle *th,
707                        struct object_update_reply *reply, int index,
708                        const char *file, int line)
709 {
710         struct tx_arg   *arg;
711         int             rc;
712
713         rc = dt_declare_record_write(env, dt_obj, buf, pos, th);
714         if (rc != 0)
715                 return rc;
716
717         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
718         if (IS_ERR(arg))
719                 return PTR_ERR(arg);
720
721         lu_object_get(&dt_obj->do_lu);
722         arg->object = dt_obj;
723         arg->u.write.buf = *buf;
724         arg->u.write.pos = pos;
725         arg->reply = reply;
726         arg->index = index;
727         return 0;
728 }
729
730 static int out_tx_xattr_set_exec(const struct lu_env *env,
731                                  struct thandle *th,
732                                  struct tx_arg *arg)
733 {
734         struct dt_object *dt_obj = arg->object;
735         int rc;
736
737         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
738                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
739                arg->u.xattr_set.name, arg->u.xattr_set.flags);
740
741         if (!lu_object_exists(&dt_obj->do_lu))
742                 GOTO(out, rc = -ENOENT);
743
744         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
745         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
746                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
747                           th);
748         /**
749          * Ignore errors if this is LINK EA
750          **/
751         if (unlikely(rc != 0 &&
752                      strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
753                 /* XXX: If the linkEA is overflow, then we need to notify the
754                  *      namespace LFSCK to skip "nlink" attribute verification
755                  *      on this object to avoid the "nlink" to be shrinked by
756                  *      wrong. It may be not good an interaction with LFSCK
757                  *      like this. We will consider to replace it with other
758                  *      mechanism in future. LU-5802. */
759                 if (rc == -ENOSPC && arg->reply != NULL) {
760                         struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
761
762                         lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
763                                        LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
764                         tgt_lfsck_in_notify(env,
765                                 tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
766                 }
767
768                 rc = 0;
769         }
770         dt_write_unlock(env, dt_obj);
771
772 out:
773         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
774                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
775
776         if (arg->reply != NULL)
777                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
778                                             rc);
779
780         return rc;
781 }
782
783 int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
784                            const struct lu_buf *buf, const char *name,
785                            int flags, struct thandle_exec_args *ta,
786                            struct thandle *th,
787                            struct object_update_reply *reply,
788                            int index, const char *file, int line)
789 {
790         struct tx_arg   *arg;
791         int             rc;
792
793         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, th);
794         if (rc != 0)
795                 return rc;
796
797         if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) {
798                 struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
799
800                 /* XXX: If the linkEA is overflow, then we need to notify the
801                  *      namespace LFSCK to skip "nlink" attribute verification
802                  *      on this object to avoid the "nlink" to be shrinked by
803                  *      wrong. It may be not good an interaction with LFSCK
804                  *      like this. We will consider to replace it with other
805                  *      mechanism in future. LU-5802. */
806                 lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
807                                LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
808                 rc = tgt_lfsck_in_notify(env,
809                                          tgt_ses_info(env)->tsi_tgt->lut_bottom,
810                                          lr, ta->ta_handle);
811                 if (rc != 0)
812                         return rc;
813         }
814
815         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
816         if (IS_ERR(arg))
817                 return PTR_ERR(arg);
818
819         lu_object_get(&dt_obj->do_lu);
820         arg->object = dt_obj;
821         arg->u.xattr_set.name = name;
822         arg->u.xattr_set.flags = flags;
823         arg->u.xattr_set.buf = *buf;
824         arg->reply = reply;
825         arg->index = index;
826         arg->u.xattr_set.csum = 0;
827         return 0;
828 }
829
830 static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th,
831                                  struct tx_arg *arg)
832 {
833         struct dt_object *dt_obj = arg->object;
834         int rc;
835
836         CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n",
837                dt_obd_name(th->th_dev), arg->u.xattr_set.name,
838                PFID(lu_object_fid(&dt_obj->do_lu)));
839
840         if (!lu_object_exists(&dt_obj->do_lu))
841                 GOTO(out, rc = -ENOENT);
842
843         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
844         rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name,
845                           th);
846         dt_write_unlock(env, dt_obj);
847 out:
848         CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n",
849                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
850
851         if (arg->reply != NULL)
852                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
853                                             rc);
854
855         return rc;
856 }
857
858 int out_xattr_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
859                            const char *name, struct thandle_exec_args *ta,
860                            struct thandle *th,
861                            struct object_update_reply *reply, int index,
862                            const char *file, int line)
863 {
864         struct tx_arg   *arg;
865         int             rc;
866
867         rc = dt_declare_xattr_del(env, dt_obj, name, th);
868         if (rc != 0)
869                 return rc;
870
871         arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line);
872         if (IS_ERR(arg))
873                 return PTR_ERR(arg);
874
875         lu_object_get(&dt_obj->do_lu);
876         arg->object = dt_obj;
877         arg->u.xattr_set.name = name;
878         arg->reply = reply;
879         arg->index = index;
880         return 0;
881 }
882
883 static int out_obj_ref_add(const struct lu_env *env,
884                            struct dt_object *dt_obj,
885                            struct thandle *th)
886 {
887         int rc;
888
889         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
890         rc = dt_ref_add(env, dt_obj, th);
891         dt_write_unlock(env, dt_obj);
892
893         return rc;
894 }
895
896 static int out_obj_ref_del(const struct lu_env *env,
897                            struct dt_object *dt_obj,
898                            struct thandle *th)
899 {
900         int rc;
901
902         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
903         rc = dt_ref_del(env, dt_obj, th);
904         dt_write_unlock(env, dt_obj);
905
906         return rc;
907 }
908
909 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
910                                struct tx_arg *arg)
911 {
912         struct dt_object *dt_obj = arg->object;
913         int rc;
914
915         rc = out_obj_ref_add(env, dt_obj, th);
916
917         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
918                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
919
920         if (arg->reply != NULL)
921                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
922                                             rc);
923         return rc;
924 }
925
926 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
927                                struct tx_arg *arg)
928 {
929         return out_obj_ref_del(env, arg->object, th);
930 }
931
932 int out_ref_add_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
933                          struct thandle_exec_args *ta,
934                          struct thandle *th,
935                          struct object_update_reply *reply, int index,
936                          const char *file, int line)
937 {
938         struct tx_arg   *arg;
939         int             rc;
940
941         rc = dt_declare_ref_add(env, dt_obj, th);
942         if (rc != 0)
943                 return rc;
944
945         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
946                           line);
947         if (IS_ERR(arg))
948                 return PTR_ERR(arg);
949
950         lu_object_get(&dt_obj->do_lu);
951         arg->object = dt_obj;
952         arg->reply = reply;
953         arg->index = index;
954         return 0;
955 }
956
957 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
958                                struct tx_arg *arg)
959 {
960         struct dt_object        *dt_obj = arg->object;
961         int                      rc;
962
963         rc = out_obj_ref_del(env, dt_obj, th);
964
965         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
966                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
967
968         if (arg->reply != NULL)
969                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
970                                             rc);
971
972         return rc;
973 }
974
975 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
976                                struct tx_arg *arg)
977 {
978         return out_obj_ref_add(env, arg->object, th);
979 }
980
981 int out_ref_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
982                          struct thandle_exec_args *ta,
983                          struct thandle *th,
984                          struct object_update_reply *reply, int index,
985                          const char *file, int line)
986 {
987         struct tx_arg   *arg;
988         int             rc;
989
990         rc = dt_declare_ref_del(env, dt_obj, th);
991         if (rc != 0)
992                 return rc;
993
994         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
995                           line);
996         if (IS_ERR(arg))
997                 return PTR_ERR(arg);
998
999         lu_object_get(&dt_obj->do_lu);
1000         arg->object = dt_obj;
1001         arg->reply = reply;
1002         arg->index = index;
1003         return 0;
1004 }
1005
1006 static int out_obj_index_insert(const struct lu_env *env,
1007                                 struct dt_object *dt_obj,
1008                                 const struct dt_rec *rec,
1009                                 const struct dt_key *key,
1010                                 struct thandle *th)
1011 {
1012         int rc;
1013
1014         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n",
1015                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
1016                (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid),
1017                ((struct dt_insert_rec *)rec)->rec_type);
1018
1019         if (dt_try_as_dir(env, dt_obj) == 0)
1020                 return -ENOTDIR;
1021
1022         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1023         rc = dt_insert(env, dt_obj, rec, key, th, 0);
1024         dt_write_unlock(env, dt_obj);
1025
1026         return rc;
1027 }
1028
1029 static int out_obj_index_delete(const struct lu_env *env,
1030                                 struct dt_object *dt_obj,
1031                                 const struct dt_key *key,
1032                                 struct thandle *th)
1033 {
1034         int rc;
1035
1036         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
1037                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
1038                (char *)key);
1039
1040         if (dt_try_as_dir(env, dt_obj) == 0)
1041                 return -ENOTDIR;
1042
1043         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1044         rc = dt_delete(env, dt_obj, key, th);
1045         dt_write_unlock(env, dt_obj);
1046
1047         return rc;
1048 }
1049
1050 static int out_tx_index_insert_exec(const struct lu_env *env,
1051                                     struct thandle *th, struct tx_arg *arg)
1052 {
1053         struct dt_object *dt_obj = arg->object;
1054         int rc;
1055
1056         if (unlikely(!dt_object_exists(dt_obj)))
1057                 RETURN(-ESTALE);
1058
1059         rc = out_obj_index_insert(env, dt_obj,
1060                                   (const struct dt_rec *)&arg->u.insert.rec,
1061                                   arg->u.insert.key, th);
1062
1063         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1064                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1065
1066         if (arg->reply != NULL)
1067                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
1068                                             rc);
1069         return rc;
1070 }
1071
1072 static int out_tx_index_insert_undo(const struct lu_env *env,
1073                                     struct thandle *th, struct tx_arg *arg)
1074 {
1075         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1076 }
1077
1078 int out_index_insert_add_exec(const struct lu_env *env,
1079                               struct dt_object *dt_obj,
1080                               const struct dt_rec *rec,
1081                               const struct dt_key *key,
1082                               struct thandle_exec_args *ta,
1083                               struct thandle *th,
1084                               struct object_update_reply *reply,
1085                               int index, const char *file, int line)
1086 {
1087         struct tx_arg   *arg;
1088         int             rc;
1089
1090         if (dt_try_as_dir(env, dt_obj) == 0) {
1091                 rc = -ENOTDIR;
1092                 return rc;
1093         }
1094
1095         rc = dt_declare_insert(env, dt_obj, rec, key, th);
1096         if (rc != 0)
1097                 return rc;
1098
1099         arg = tx_add_exec(ta, out_tx_index_insert_exec,
1100                           out_tx_index_insert_undo, file, line);
1101         if (IS_ERR(arg))
1102                 return PTR_ERR(arg);
1103
1104         lu_object_get(&dt_obj->do_lu);
1105         arg->object = dt_obj;
1106         arg->reply = reply;
1107         arg->index = index;
1108         arg->u.insert.rec = *(const struct dt_insert_rec *)rec;
1109         arg->u.insert.key = key;
1110
1111         return 0;
1112 }
1113
1114 static int out_tx_index_delete_exec(const struct lu_env *env,
1115                                     struct thandle *th,
1116                                     struct tx_arg *arg)
1117 {
1118         int rc;
1119
1120         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1121
1122         CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n",
1123                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1124
1125         if (arg->reply != NULL)
1126                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
1127                                             rc);
1128
1129         return rc;
1130 }
1131
1132 static int out_tx_index_delete_undo(const struct lu_env *env,
1133                                     struct thandle *th,
1134                                     struct tx_arg *arg)
1135 {
1136         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1137                dt_obd_name(th->th_dev), -ENOTSUPP);
1138         return -ENOTSUPP;
1139 }
1140
1141 int out_index_delete_add_exec(const struct lu_env *env,
1142                               struct dt_object *dt_obj,
1143                               const struct dt_key *key,
1144                               struct thandle_exec_args *ta,
1145                               struct thandle *th,
1146                               struct object_update_reply *reply,
1147                               int index, const char *file, int line)
1148 {
1149         struct tx_arg   *arg;
1150         int             rc;
1151
1152         if (dt_try_as_dir(env, dt_obj) == 0) {
1153                 rc = -ENOTDIR;
1154                 return rc;
1155         }
1156
1157         LASSERT(ta->ta_handle != NULL);
1158         rc = dt_declare_delete(env, dt_obj, key, th);
1159         if (rc != 0)
1160                 return rc;
1161
1162         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1163                           out_tx_index_delete_undo, file, line);
1164         if (IS_ERR(arg))
1165                 return PTR_ERR(arg);
1166
1167         lu_object_get(&dt_obj->do_lu);
1168         arg->object = dt_obj;
1169         arg->reply = reply;
1170         arg->index = index;
1171         arg->u.insert.key = key;
1172         return 0;
1173 }
1174
1175 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1176                                struct tx_arg *arg)
1177 {
1178         struct dt_object *dt_obj = arg->object;
1179         int rc;
1180
1181         rc = out_obj_destroy(env, dt_obj, th);
1182
1183         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1184                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1185
1186         if (arg->reply != NULL)
1187                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
1188                                             rc);
1189
1190         RETURN(rc);
1191 }
1192
1193 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1194                                struct tx_arg *arg)
1195 {
1196         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1197                dt_obd_name(th->th_dev), -ENOTSUPP);
1198         return -ENOTSUPP;
1199 }
1200
1201 int out_destroy_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
1202                          struct thandle_exec_args *ta, struct thandle *th,
1203                          struct object_update_reply *reply,
1204                          int index, const char *file, int line)
1205 {
1206         struct tx_arg   *arg;
1207         int             rc;
1208
1209         rc = dt_declare_destroy(env, dt_obj, th);
1210         if (rc != 0)
1211                 return rc;
1212
1213         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1214                           file, line);
1215         if (IS_ERR(arg))
1216                 return PTR_ERR(arg);
1217
1218         lu_object_get(&dt_obj->do_lu);
1219         arg->object = dt_obj;
1220         arg->reply = reply;
1221         arg->index = index;
1222         return 0;
1223 }