Whamcloud - gitweb
0c2c95b73007d3c1ba0877537c951f0f44ddaaa8
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <md_object.h>
36 #include <lustre_update.h>
37 #include <obd.h>
38 #include <obd_class.h>
39 #include "tgt_internal.h"
40
41 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
42 #define OUT_UPDATE_BUFFER_SIZE_MAX      (256 * 4096)  /* 1MB update size now */
43
44 const char *update_op_str(__u16 opc)
45 {
46         static const char *opc_str[] = {
47                 [OUT_START] = "start",
48                 [OUT_CREATE] = "create",
49                 [OUT_DESTROY] = "destroy",
50                 [OUT_REF_ADD] = "ref_add",
51                 [OUT_REF_DEL] = "ref_del" ,
52                 [OUT_ATTR_SET] = "attr_set",
53                 [OUT_ATTR_GET] = "attr_get",
54                 [OUT_XATTR_SET] = "xattr_set",
55                 [OUT_XATTR_GET] = "xattr_get",
56                 [OUT_INDEX_LOOKUP] = "lookup",
57                 [OUT_INDEX_INSERT] = "insert",
58                 [OUT_INDEX_DELETE] = "delete",
59                 [OUT_WRITE] = "write",
60                 [OUT_XATTR_DEL] = "xattr_del",
61                 [OUT_PUNCH] = "punch",
62                 [OUT_READ] = "read",
63         };
64
65         if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
66                 return opc_str[opc];
67         else
68                 return "unknown";
69 }
70 EXPORT_SYMBOL(update_op_str);
71
72 /**
73  * Fill object update header
74  *
75  * Only fill the object update header, and parameters will be filled later
76  * in other functions.
77  *
78  * \params[in] env              execution environment
79  * \params[in] update           object update to be filled
80  * \params[in] max_update_size  maximum object update size, if the current
81  *                              update length equals or exceeds the size, it
82  *                              will return -E2BIG.
83  * \params[in] update_op        update type
84  * \params[in] fid              object FID of the update
85  * \params[in] params_count     the count of the update parameters
86  * \params[in] params_sizes     the length of each parameters
87  *
88  * \retval                      0 if packing succeeds.
89  * \retval                      -E2BIG if packing exceeds the maximum length.
90  */
91 int out_update_header_pack(const struct lu_env *env,
92                            struct object_update *update, size_t max_update_size,
93                            enum update_type update_op, const struct lu_fid *fid,
94                            unsigned int param_count, __u16 *params_sizes)
95 {
96         struct object_update_param      *param;
97         unsigned int                    i;
98         size_t                          update_size;
99
100         /* Check whether the packing exceeding the maxima update length */
101         update_size = sizeof(*update);
102         for (i = 0; i < param_count; i++)
103                 update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
104
105         if (unlikely(update_size >= max_update_size))
106                 return -E2BIG;
107
108         update->ou_fid = *fid;
109         update->ou_type = update_op;
110         update->ou_params_count = param_count;
111         param = &update->ou_params[0];
112         for (i = 0; i < param_count; i++) {
113                 param->oup_len = params_sizes[i];
114                 param = (struct object_update_param *)((char *)param +
115                          object_update_param_size(param));
116         }
117
118         return 0;
119 }
120
121 /**
122  * Packs one update into the update_buffer.
123  *
124  * \param[in] env       execution environment
125  * \param[in] update    update to be packed
126  * \param[in] max_update_size   *maximum size of \a update
127  * \param[in] op        update operation (enum update_type)
128  * \param[in] fid       object FID for this update
129  * \param[in] param_count       number of parameters for this update
130  * \param[in] param_sizes       array of parameters length of this update
131  * \param[in] param_bufs        parameter buffers
132  *
133  * \retval              = 0 if updates packing succeeds
134  * \retval              negative errno if updates packing fails
135  **/
136 int out_update_pack(const struct lu_env *env, struct object_update *update,
137                     size_t max_update_size, enum update_type op,
138                     const struct lu_fid *fid, unsigned int param_count,
139                     __u16 *param_sizes, const void **param_bufs)
140 {
141         struct object_update_param      *param;
142         unsigned int                    i;
143         int                             rc;
144         ENTRY;
145
146         rc = out_update_header_pack(env, update, max_update_size, op, fid,
147                                     param_count, param_sizes);
148         if (rc != 0)
149                 RETURN(rc);
150
151         param = &update->ou_params[0];
152         for (i = 0; i < param_count; i++) {
153                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
154                 param = (struct object_update_param *)((char *)param +
155                          object_update_param_size(param));
156         }
157
158         RETURN(0);
159 }
160 EXPORT_SYMBOL(out_update_pack);
161
162 /**
163  * Pack various updates into the update_buffer.
164  *
165  * The following functions pack different updates into the update_buffer
166  * So parameters of these API is basically same as its correspondent OSD/OSP
167  * API, for detail description of these parameters see osd_handler.c or
168  * osp_md_object.c.
169  *
170  * \param[in] env       execution environment
171  * \param[in] ubuf      update buffer
172  * \param[in] fid       fid of this object for the update
173  *
174  * \retval              0 if insertion succeeds.
175  * \retval              negative errno if insertion fails.
176  */
177 int out_create_pack(const struct lu_env *env, struct object_update *update,
178                     size_t max_update_size, const struct lu_fid *fid,
179                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
180                     struct dt_object_format *dof)
181 {
182         struct obdo             *obdo;
183         __u16                   sizes[2] = {sizeof(*obdo), 0};
184         int                     buf_count = 1;
185         const struct lu_fid     *parent_fid = NULL;
186         int                     rc;
187         ENTRY;
188
189         if (hint != NULL && hint->dah_parent) {
190                 parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
191                 sizes[1] = sizeof(*parent_fid);
192                 buf_count++;
193         }
194
195         rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
196                                     fid, buf_count, sizes);
197         if (rc != 0)
198                 RETURN(rc);
199
200         obdo = object_update_param_get(update, 0, NULL);
201         LASSERT(obdo != NULL);
202         obdo->o_valid = 0;
203         obdo_from_la(obdo, attr, attr->la_valid);
204         lustre_set_wire_obdo(NULL, obdo, obdo);
205
206         if (parent_fid != NULL) {
207                 struct lu_fid *tmp;
208
209                 tmp = object_update_param_get(update, 1, NULL);
210                 LASSERT(tmp != NULL);
211                 fid_cpu_to_le(tmp, parent_fid);
212         }
213
214         RETURN(0);
215 }
216 EXPORT_SYMBOL(out_create_pack);
217
218 int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
219                      size_t max_update_size, const struct lu_fid *fid)
220 {
221         return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
222                                0, NULL, NULL);
223 }
224 EXPORT_SYMBOL(out_ref_del_pack);
225
226 int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
227                      size_t max_update_size, const struct lu_fid *fid)
228 {
229         return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
230                                0, NULL, NULL);
231 }
232 EXPORT_SYMBOL(out_ref_add_pack);
233
234 int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
235                       size_t max_update_size, const struct lu_fid *fid,
236                       const struct lu_attr *attr)
237 {
238         struct obdo             *obdo;
239         __u16                   size = sizeof(*obdo);
240         int                     rc;
241         ENTRY;
242
243         rc = out_update_header_pack(env, update, max_update_size,
244                                     OUT_ATTR_SET, fid, 1, &size);
245         if (rc != 0)
246                 RETURN(rc);
247
248         obdo = object_update_param_get(update, 0, NULL);
249         LASSERT(obdo != NULL);
250         obdo->o_valid = 0;
251         obdo_from_la(obdo, attr, attr->la_valid);
252         lustre_set_wire_obdo(NULL, obdo, obdo);
253
254         RETURN(0);
255 }
256 EXPORT_SYMBOL(out_attr_set_pack);
257
258 int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
259                        size_t max_update_size, const struct lu_fid *fid,
260                        const struct lu_buf *buf, const char *name, __u32 flag)
261 {
262         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
263         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
264                                (char *)&flag};
265
266         return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
267                                fid, ARRAY_SIZE(sizes), sizes, bufs);
268 }
269 EXPORT_SYMBOL(out_xattr_set_pack);
270
271 int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
272                        size_t max_update_size, const struct lu_fid *fid,
273                        const char *name)
274 {
275         __u16   size = strlen(name) + 1;
276
277         return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
278                                fid, 1, &size, (const void **)&name);
279 }
280 EXPORT_SYMBOL(out_xattr_del_pack);
281
282
283 int out_index_insert_pack(const struct lu_env *env,
284                           struct object_update *update,
285                           size_t max_update_size, const struct lu_fid *fid,
286                           const struct dt_rec *rec, const struct dt_key *key)
287 {
288         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
289         struct lu_fid              rec_fid;
290         __u32                       type = cpu_to_le32(rec1->rec_type);
291         __u16                       sizes[3] = { strlen((char *)key) + 1,
292                                                 sizeof(rec_fid),
293                                                 sizeof(type) };
294         const void                 *bufs[3] = { (char *)key,
295                                                 (char *)&rec_fid,
296                                                 (char *)&type };
297
298         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
299
300         return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
301                                fid, ARRAY_SIZE(sizes), sizes, bufs);
302 }
303 EXPORT_SYMBOL(out_index_insert_pack);
304
305 int out_index_delete_pack(const struct lu_env *env,
306                           struct object_update *update,
307                           size_t max_update_size, const struct lu_fid *fid,
308                           const struct dt_key *key)
309 {
310         __u16   size = strlen((char *)key) + 1;
311         const void *buf = key;
312
313         return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
314                                fid, 1, &size, &buf);
315 }
316 EXPORT_SYMBOL(out_index_delete_pack);
317
318 int out_object_destroy_pack(const struct lu_env *env,
319                             struct object_update *update,
320                             size_t max_update_size, const struct lu_fid *fid)
321 {
322         return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
323                                0, NULL, NULL);
324 }
325 EXPORT_SYMBOL(out_object_destroy_pack);
326
327 int out_write_pack(const struct lu_env *env, struct object_update *update,
328                    size_t max_update_size, const struct lu_fid *fid,
329                    const struct lu_buf *buf, __u64 pos)
330 {
331         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
332         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
333         int             rc;
334
335         pos = cpu_to_le64(pos);
336
337         rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
338                              ARRAY_SIZE(sizes), sizes, bufs);
339         return rc;
340 }
341 EXPORT_SYMBOL(out_write_pack);
342
343 /**
344  * Pack various readonly updates into the update_buffer.
345  *
346  * The following update funcs are only used by read-only ops, lookup,
347  * getattr etc, so it does not need transaction here. Currently they
348  * are only used by OSP.
349  *
350  * \param[in] env       execution environment
351  * \param[in] fid       fid of this object for the update
352  * \param[in] ubuf      update buffer
353  *
354  * \retval              = 0 pack succeed.
355  *                      < 0 pack failed.
356  **/
357 int out_index_lookup_pack(const struct lu_env *env,
358                           struct object_update *update,
359                           size_t max_update_size, const struct lu_fid *fid,
360                           struct dt_rec *rec, const struct dt_key *key)
361 {
362         const void      *name = key;
363         __u16           size = strlen((char *)name) + 1;
364
365         return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
366                                fid, 1, &size, &name);
367 }
368 EXPORT_SYMBOL(out_index_lookup_pack);
369
370 int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
371                       size_t max_update_size, const struct lu_fid *fid)
372 {
373         return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
374                                fid, 0, NULL, NULL);
375 }
376 EXPORT_SYMBOL(out_attr_get_pack);
377
378 int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
379                        size_t max_update_size, const struct lu_fid *fid,
380                        const char *name)
381 {
382         __u16 size;
383
384         LASSERT(name != NULL);
385         size = strlen(name) + 1;
386
387         return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
388                                fid, 1, &size, (const void **)&name);
389 }
390 EXPORT_SYMBOL(out_xattr_get_pack);
391
392 int out_read_pack(const struct lu_env *env, struct object_update *update,
393                   size_t max_update_length, const struct lu_fid *fid,
394                   size_t size, loff_t pos)
395 {
396         __u16           sizes[2] = {sizeof(size), sizeof(pos)};
397         const void      *bufs[2] = {&size, &pos};
398
399         size = cpu_to_le64(size);
400         pos = cpu_to_le64(pos);
401
402         return out_update_pack(env, update, max_update_length, OUT_READ, fid,
403                                ARRAY_SIZE(sizes), sizes, bufs);
404 }
405 EXPORT_SYMBOL(out_read_pack);
406
407 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
408 {
409         struct tx_arg   **new_ta;
410         int             i;
411         int             rc = 0;
412
413         if (ta->ta_alloc_args >= new_alloc_ta)
414                 return 0;
415
416         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
417         if (new_ta == NULL)
418                 return -ENOMEM;
419
420         for (i = 0; i < new_alloc_ta; i++) {
421                 if (i < ta->ta_alloc_args) {
422                         /* copy the old args to new one */
423                         new_ta[i] = ta->ta_args[i];
424                 } else {
425                         OBD_ALLOC_PTR(new_ta[i]);
426                         if (new_ta[i] == NULL)
427                                 GOTO(out, rc = -ENOMEM);
428                 }
429         }
430
431         /* free the old args */
432         if (ta->ta_args != NULL)
433                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
434                                       ta->ta_alloc_args);
435
436         ta->ta_args = new_ta;
437         ta->ta_alloc_args = new_alloc_ta;
438 out:
439         if (rc != 0) {
440                 for (i = 0; i < new_alloc_ta; i++) {
441                         if (new_ta[i] != NULL)
442                                 OBD_FREE_PTR(new_ta[i]);
443                 }
444                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
445         }
446         return rc;
447 }
448
449 #define TX_ALLOC_STEP   8
450 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
451                            tx_exec_func_t func, tx_exec_func_t undo,
452                            const char *file, int line)
453 {
454         int rc;
455         int i;
456
457         LASSERT(ta != NULL);
458         LASSERT(func != NULL);
459
460         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
461                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
462                 if (rc != 0)
463                         return ERR_PTR(rc);
464         }
465
466         i = ta->ta_argno;
467
468         ta->ta_argno++;
469
470         ta->ta_args[i]->exec_fn = func;
471         ta->ta_args[i]->undo_fn = undo;
472         ta->ta_args[i]->file    = file;
473         ta->ta_args[i]->line    = line;
474
475         return ta->ta_args[i];
476 }
477
478 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
479                            struct thandle *th)
480 {
481         int rc;
482
483         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
484                PFID(lu_object_fid(&dt_obj->do_lu)));
485
486         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
487         rc = dt_destroy(env, dt_obj, th);
488         dt_write_unlock(env, dt_obj);
489
490         return rc;
491 }
492
493 /**
494  * All of the xxx_undo will be used once execution failed,
495  * But because all of the required resource has been reserved in
496  * declare phase, i.e. if declare succeed, it should make sure
497  * the following executing phase succeed in anyway, so these undo
498  * should be useless for most of the time in Phase I
499  */
500 static int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
501                               struct tx_arg *arg)
502 {
503         int rc;
504
505         rc = out_obj_destroy(env, arg->object, th);
506         if (rc != 0)
507                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
508                        dt_obd_name(th->th_dev), rc);
509         return rc;
510 }
511
512 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
513                        struct tx_arg *arg)
514 {
515         struct dt_object        *dt_obj = arg->object;
516         int                      rc;
517
518         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
519                dt_obd_name(th->th_dev),
520                PFID(lu_object_fid(&arg->object->do_lu)),
521                arg->u.create.dof.dof_type,
522                arg->u.create.attr.la_mode & S_IFMT);
523
524         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
525         rc = dt_create(env, dt_obj, &arg->u.create.attr,
526                        &arg->u.create.hint, &arg->u.create.dof, th);
527
528         dt_write_unlock(env, dt_obj);
529
530         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
531                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
532
533         if (arg->reply != NULL)
534                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
535                                             rc);
536
537         return rc;
538 }
539
540 /**
541  * Add create update to thandle
542  *
543  * Declare create updates and add the update to the thandle updates
544  * exec array.
545  *
546  * \param [in] env      execution environment
547  * \param [in] obj      object to be created
548  * \param [in] attr     attributes of the creation
549  * \param [in] parent_fid the fid of the parent
550  * \param [in] dof      dt object format of the creation
551  * \param [in] ta       thandle execuation args where all of updates
552  *                      of the transaction are stored
553  * \param [in] th       thandle for this update
554  * \param [in] reply    reply of the updates
555  * \param [in] index    index of the reply
556  * \param [in] file     the file name where the function is called,
557  *                      which is only for debugging purpose.
558  * \param [in] line     the line number where the funtion is called,
559  *                      which is only for debugging purpose.
560  *
561  * \retval              0 if updates is added successfully.
562  * \retval              negative errno if update adding fails.
563  */
564 int out_create_add_exec(const struct lu_env *env, struct dt_object *obj,
565                         struct lu_attr *attr, struct lu_fid *parent_fid,
566                         struct dt_object_format *dof,
567                         struct thandle_exec_args *ta,
568                         struct thandle  *th,
569                         struct object_update_reply *reply,
570                         int index, const char *file, int line)
571 {
572         struct tx_arg *arg;
573         int rc;
574
575         rc = dt_declare_create(env, obj, attr, NULL, dof, th);
576         if (rc != 0)
577                 return rc;
578
579         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
580                           line);
581         if (IS_ERR(arg))
582                 return PTR_ERR(arg);
583
584         /* release the object in out_trans_stop */
585         lu_object_get(&obj->do_lu);
586         arg->object = obj;
587         arg->u.create.attr = *attr;
588         if (parent_fid != NULL)
589                 arg->u.create.fid = *parent_fid;
590         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
591         arg->u.create.dof  = *dof;
592         arg->reply = reply;
593         arg->index = index;
594
595         return 0;
596 }
597
598 static int out_tx_attr_set_undo(const struct lu_env *env,
599                                 struct thandle *th, struct tx_arg *arg)
600 {
601         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
602                dt_obd_name(th->th_dev),
603                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
604
605         return -ENOTSUPP;
606 }
607
608 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
609                                 struct tx_arg *arg)
610 {
611         struct dt_object        *dt_obj = arg->object;
612         int                     rc;
613
614         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
615                PFID(lu_object_fid(&dt_obj->do_lu)));
616
617         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
618         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th);
619         dt_write_unlock(env, dt_obj);
620
621         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
622                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
623
624         if (arg->reply != NULL)
625                 object_update_result_insert(arg->reply, NULL, 0,
626                                             arg->index, rc);
627
628         return rc;
629 }
630
631 int out_attr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
632                           const struct lu_attr *attr,
633                           struct thandle_exec_args *ta,
634                           struct thandle *th, struct object_update_reply *reply,
635                           int index, const char *file, int line)
636 {
637         struct tx_arg   *arg;
638         int             rc;
639
640         rc = dt_declare_attr_set(env, dt_obj, attr, th);
641         if (rc != 0)
642                 return rc;
643
644         arg = tx_add_exec(ta, out_tx_attr_set_exec, out_tx_attr_set_undo,
645                           file, line);
646         if (IS_ERR(arg))
647                 return PTR_ERR(arg);
648
649         lu_object_get(&dt_obj->do_lu);
650         arg->object = dt_obj;
651         arg->u.attr_set.attr = *attr;
652         arg->reply = reply;
653         arg->index = index;
654         return 0;
655 }
656
657 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
658                              struct tx_arg *arg)
659 {
660         struct dt_object *dt_obj = arg->object;
661         int rc;
662
663         CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
664                PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
665                arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
666
667         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
668         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
669                              &arg->u.write.pos, th);
670         dt_write_unlock(env, dt_obj);
671
672         if (rc == 0)
673                 rc = arg->u.write.buf.lb_len;
674
675         if (arg->reply != NULL)
676                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
677                                             rc);
678
679         return rc > 0 ? 0 : rc;
680 }
681
682 int out_write_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
683                        const struct lu_buf *buf, loff_t pos,
684                        struct thandle_exec_args *ta, struct thandle *th,
685                        struct object_update_reply *reply, int index,
686                        const char *file, int line)
687 {
688         struct tx_arg   *arg;
689         int             rc;
690
691         rc = dt_declare_record_write(env, dt_obj, buf, pos, th);
692         if (rc != 0)
693                 return rc;
694
695         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
696         if (IS_ERR(arg))
697                 return PTR_ERR(arg);
698
699         lu_object_get(&dt_obj->do_lu);
700         arg->object = dt_obj;
701         arg->u.write.buf = *buf;
702         arg->u.write.pos = pos;
703         arg->reply = reply;
704         arg->index = index;
705         return 0;
706 }
707
708 static int out_tx_xattr_set_exec(const struct lu_env *env,
709                                  struct thandle *th,
710                                  struct tx_arg *arg)
711 {
712         struct dt_object *dt_obj = arg->object;
713         int rc;
714
715         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
716                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
717                arg->u.xattr_set.name, arg->u.xattr_set.flags);
718
719         if (!lu_object_exists(&dt_obj->do_lu))
720                 GOTO(out, rc = -ENOENT);
721
722         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
723         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
724                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
725                           th);
726         /**
727          * Ignore errors if this is LINK EA
728          **/
729         if (unlikely(rc != 0 &&
730                      strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
731                 /* XXX: If the linkEA is overflow, then we need to notify the
732                  *      namespace LFSCK to skip "nlink" attribute verification
733                  *      on this object to avoid the "nlink" to be shrinked by
734                  *      wrong. It may be not good an interaction with LFSCK
735                  *      like this. We will consider to replace it with other
736                  *      mechanism in future. LU-5802. */
737                 if (rc == -ENOSPC && arg->reply != NULL) {
738                         struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
739
740                         lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
741                                        LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
742                         tgt_lfsck_in_notify(env,
743                                 tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
744                 }
745
746                 rc = 0;
747         }
748         dt_write_unlock(env, dt_obj);
749
750 out:
751         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
752                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
753
754         if (arg->reply != NULL)
755                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
756                                             rc);
757
758         return rc;
759 }
760
761 int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
762                            const struct lu_buf *buf, const char *name,
763                            int flags, struct thandle_exec_args *ta,
764                            struct thandle *th,
765                            struct object_update_reply *reply,
766                            int index, const char *file, int line)
767 {
768         struct tx_arg   *arg;
769         int             rc;
770
771         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, th);
772         if (rc != 0)
773                 return rc;
774
775         if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) {
776                 struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
777
778                 /* XXX: If the linkEA is overflow, then we need to notify the
779                  *      namespace LFSCK to skip "nlink" attribute verification
780                  *      on this object to avoid the "nlink" to be shrinked by
781                  *      wrong. It may be not good an interaction with LFSCK
782                  *      like this. We will consider to replace it with other
783                  *      mechanism in future. LU-5802. */
784                 lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
785                                LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
786                 rc = tgt_lfsck_in_notify(env,
787                                          tgt_ses_info(env)->tsi_tgt->lut_bottom,
788                                          lr, ta->ta_handle);
789                 if (rc != 0)
790                         return rc;
791         }
792
793         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
794         if (IS_ERR(arg))
795                 return PTR_ERR(arg);
796
797         lu_object_get(&dt_obj->do_lu);
798         arg->object = dt_obj;
799         arg->u.xattr_set.name = name;
800         arg->u.xattr_set.flags = flags;
801         arg->u.xattr_set.buf = *buf;
802         arg->reply = reply;
803         arg->index = index;
804         arg->u.xattr_set.csum = 0;
805         return 0;
806 }
807
808 static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th,
809                                  struct tx_arg *arg)
810 {
811         struct dt_object *dt_obj = arg->object;
812         int rc;
813
814         CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n",
815                dt_obd_name(th->th_dev), arg->u.xattr_set.name,
816                PFID(lu_object_fid(&dt_obj->do_lu)));
817
818         if (!lu_object_exists(&dt_obj->do_lu))
819                 GOTO(out, rc = -ENOENT);
820
821         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
822         rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name,
823                           th);
824         dt_write_unlock(env, dt_obj);
825 out:
826         CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n",
827                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
828
829         if (arg->reply != NULL)
830                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
831                                             rc);
832
833         return rc;
834 }
835
836 int out_xattr_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
837                            const char *name, struct thandle_exec_args *ta,
838                            struct thandle *th,
839                            struct object_update_reply *reply, int index,
840                            const char *file, int line)
841 {
842         struct tx_arg   *arg;
843         int             rc;
844
845         rc = dt_declare_xattr_del(env, dt_obj, name, th);
846         if (rc != 0)
847                 return rc;
848
849         arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line);
850         if (IS_ERR(arg))
851                 return PTR_ERR(arg);
852
853         lu_object_get(&dt_obj->do_lu);
854         arg->object = dt_obj;
855         arg->u.xattr_set.name = name;
856         arg->reply = reply;
857         arg->index = index;
858         return 0;
859 }
860
861 static int out_obj_ref_add(const struct lu_env *env,
862                            struct dt_object *dt_obj,
863                            struct thandle *th)
864 {
865         int rc;
866
867         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
868         rc = dt_ref_add(env, dt_obj, th);
869         dt_write_unlock(env, dt_obj);
870
871         return rc;
872 }
873
874 static int out_obj_ref_del(const struct lu_env *env,
875                            struct dt_object *dt_obj,
876                            struct thandle *th)
877 {
878         int rc;
879
880         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
881         rc = dt_ref_del(env, dt_obj, th);
882         dt_write_unlock(env, dt_obj);
883
884         return rc;
885 }
886
887 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
888                                struct tx_arg *arg)
889 {
890         struct dt_object *dt_obj = arg->object;
891         int rc;
892
893         rc = out_obj_ref_add(env, dt_obj, th);
894
895         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
896                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
897
898         if (arg->reply != NULL)
899                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
900                                             rc);
901         return rc;
902 }
903
904 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
905                                struct tx_arg *arg)
906 {
907         return out_obj_ref_del(env, arg->object, th);
908 }
909
910 int out_ref_add_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
911                          struct thandle_exec_args *ta,
912                          struct thandle *th,
913                          struct object_update_reply *reply, int index,
914                          const char *file, int line)
915 {
916         struct tx_arg   *arg;
917         int             rc;
918
919         rc = dt_declare_ref_add(env, dt_obj, th);
920         if (rc != 0)
921                 return rc;
922
923         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
924                           line);
925         if (IS_ERR(arg))
926                 return PTR_ERR(arg);
927
928         lu_object_get(&dt_obj->do_lu);
929         arg->object = dt_obj;
930         arg->reply = reply;
931         arg->index = index;
932         return 0;
933 }
934
935 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
936                                struct tx_arg *arg)
937 {
938         struct dt_object        *dt_obj = arg->object;
939         int                      rc;
940
941         rc = out_obj_ref_del(env, dt_obj, th);
942
943         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
944                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
945
946         if (arg->reply != NULL)
947                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
948                                             rc);
949
950         return rc;
951 }
952
953 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
954                                struct tx_arg *arg)
955 {
956         return out_obj_ref_add(env, arg->object, th);
957 }
958
959 int out_ref_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
960                          struct thandle_exec_args *ta,
961                          struct thandle *th,
962                          struct object_update_reply *reply, int index,
963                          const char *file, int line)
964 {
965         struct tx_arg   *arg;
966         int             rc;
967
968         rc = dt_declare_ref_del(env, dt_obj, th);
969         if (rc != 0)
970                 return rc;
971
972         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
973                           line);
974         if (IS_ERR(arg))
975                 return PTR_ERR(arg);
976
977         lu_object_get(&dt_obj->do_lu);
978         arg->object = dt_obj;
979         arg->reply = reply;
980         arg->index = index;
981         return 0;
982 }
983
984 static int out_obj_index_insert(const struct lu_env *env,
985                                 struct dt_object *dt_obj,
986                                 const struct dt_rec *rec,
987                                 const struct dt_key *key,
988                                 struct thandle *th)
989 {
990         int rc;
991
992         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n",
993                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
994                (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid),
995                ((struct dt_insert_rec *)rec)->rec_type);
996
997         if (dt_try_as_dir(env, dt_obj) == 0)
998                 return -ENOTDIR;
999
1000         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1001         rc = dt_insert(env, dt_obj, rec, key, th, 0);
1002         dt_write_unlock(env, dt_obj);
1003
1004         return rc;
1005 }
1006
1007 static int out_obj_index_delete(const struct lu_env *env,
1008                                 struct dt_object *dt_obj,
1009                                 const struct dt_key *key,
1010                                 struct thandle *th)
1011 {
1012         int rc;
1013
1014         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
1015                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
1016                (char *)key);
1017
1018         if (dt_try_as_dir(env, dt_obj) == 0)
1019                 return -ENOTDIR;
1020
1021         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1022         rc = dt_delete(env, dt_obj, key, th);
1023         dt_write_unlock(env, dt_obj);
1024
1025         return rc;
1026 }
1027
1028 static int out_tx_index_insert_exec(const struct lu_env *env,
1029                                     struct thandle *th, struct tx_arg *arg)
1030 {
1031         struct dt_object *dt_obj = arg->object;
1032         int rc;
1033
1034         if (unlikely(!dt_object_exists(dt_obj)))
1035                 RETURN(-ESTALE);
1036
1037         rc = out_obj_index_insert(env, dt_obj,
1038                                   (const struct dt_rec *)&arg->u.insert.rec,
1039                                   arg->u.insert.key, th);
1040
1041         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1042                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1043
1044         if (arg->reply != NULL)
1045                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
1046                                             rc);
1047         return rc;
1048 }
1049
1050 static int out_tx_index_insert_undo(const struct lu_env *env,
1051                                     struct thandle *th, struct tx_arg *arg)
1052 {
1053         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1054 }
1055
1056 int out_index_insert_add_exec(const struct lu_env *env,
1057                               struct dt_object *dt_obj,
1058                               const struct dt_rec *rec,
1059                               const struct dt_key *key,
1060                               struct thandle_exec_args *ta,
1061                               struct thandle *th,
1062                               struct object_update_reply *reply,
1063                               int index, const char *file, int line)
1064 {
1065         struct tx_arg   *arg;
1066         int             rc;
1067
1068         if (dt_try_as_dir(env, dt_obj) == 0) {
1069                 rc = -ENOTDIR;
1070                 return rc;
1071         }
1072
1073         rc = dt_declare_insert(env, dt_obj, rec, key, th);
1074         if (rc != 0)
1075                 return rc;
1076
1077         arg = tx_add_exec(ta, out_tx_index_insert_exec,
1078                           out_tx_index_insert_undo, file, line);
1079         if (IS_ERR(arg))
1080                 return PTR_ERR(arg);
1081
1082         lu_object_get(&dt_obj->do_lu);
1083         arg->object = dt_obj;
1084         arg->reply = reply;
1085         arg->index = index;
1086         arg->u.insert.rec = *(const struct dt_insert_rec *)rec;
1087         arg->u.insert.key = key;
1088
1089         return 0;
1090 }
1091
1092 static int out_tx_index_delete_exec(const struct lu_env *env,
1093                                     struct thandle *th,
1094                                     struct tx_arg *arg)
1095 {
1096         int rc;
1097
1098         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1099
1100         CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n",
1101                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1102
1103         if (arg->reply != NULL)
1104                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
1105                                             rc);
1106
1107         return rc;
1108 }
1109
1110 static int out_tx_index_delete_undo(const struct lu_env *env,
1111                                     struct thandle *th,
1112                                     struct tx_arg *arg)
1113 {
1114         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1115                dt_obd_name(th->th_dev), -ENOTSUPP);
1116         return -ENOTSUPP;
1117 }
1118
1119 int out_index_delete_add_exec(const struct lu_env *env,
1120                               struct dt_object *dt_obj,
1121                               const struct dt_key *key,
1122                               struct thandle_exec_args *ta,
1123                               struct thandle *th,
1124                               struct object_update_reply *reply,
1125                               int index, const char *file, int line)
1126 {
1127         struct tx_arg   *arg;
1128         int             rc;
1129
1130         if (dt_try_as_dir(env, dt_obj) == 0) {
1131                 rc = -ENOTDIR;
1132                 return rc;
1133         }
1134
1135         LASSERT(ta->ta_handle != NULL);
1136         rc = dt_declare_delete(env, dt_obj, key, th);
1137         if (rc != 0)
1138                 return rc;
1139
1140         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1141                           out_tx_index_delete_undo, file, line);
1142         if (IS_ERR(arg))
1143                 return PTR_ERR(arg);
1144
1145         lu_object_get(&dt_obj->do_lu);
1146         arg->object = dt_obj;
1147         arg->reply = reply;
1148         arg->index = index;
1149         arg->u.insert.key = key;
1150         return 0;
1151 }
1152
1153 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1154                                struct tx_arg *arg)
1155 {
1156         struct dt_object *dt_obj = arg->object;
1157         int rc;
1158
1159         rc = out_obj_destroy(env, dt_obj, th);
1160
1161         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1162                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1163
1164         if (arg->reply != NULL)
1165                 object_update_result_insert(arg->reply, NULL, 0, arg->index,
1166                                             rc);
1167
1168         RETURN(rc);
1169 }
1170
1171 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1172                                struct tx_arg *arg)
1173 {
1174         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1175                dt_obd_name(th->th_dev), -ENOTSUPP);
1176         return -ENOTSUPP;
1177 }
1178
1179 int out_destroy_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
1180                          struct thandle_exec_args *ta, struct thandle *th,
1181                          struct object_update_reply *reply,
1182                          int index, const char *file, int line)
1183 {
1184         struct tx_arg   *arg;
1185         int             rc;
1186
1187         rc = dt_declare_destroy(env, dt_obj, th);
1188         if (rc != 0)
1189                 return rc;
1190
1191         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1192                           file, line);
1193         if (IS_ERR(arg))
1194                 return PTR_ERR(arg);
1195
1196         lu_object_get(&dt_obj->do_lu);
1197         arg->object = dt_obj;
1198         arg->reply = reply;
1199         arg->index = index;
1200         return 0;
1201 }