Whamcloud - gitweb
LU-2675 mdt: refactor mdt_getattr_name_lock()
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
40 {
41         struct tx_arg   **new_ta;
42         int             i;
43         int             rc = 0;
44
45         if (ta->ta_alloc_args >= new_alloc_ta)
46                 return 0;
47
48         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
49         if (new_ta == NULL)
50                 return -ENOMEM;
51
52         for (i = 0; i < new_alloc_ta; i++) {
53                 if (i < ta->ta_alloc_args) {
54                         /* copy the old args to new one */
55                         new_ta[i] = ta->ta_args[i];
56                 } else {
57                         OBD_ALLOC_PTR(new_ta[i]);
58                         if (new_ta[i] == NULL)
59                                 GOTO(out, rc = -ENOMEM);
60                 }
61         }
62
63         /* free the old args */
64         if (ta->ta_args != NULL)
65                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
66                                       ta->ta_alloc_args);
67
68         ta->ta_args = new_ta;
69         ta->ta_alloc_args = new_alloc_ta;
70 out:
71         if (rc != 0) {
72                 for (i = 0; i < new_alloc_ta; i++) {
73                         if (new_ta[i] != NULL)
74                                 OBD_FREE_PTR(new_ta[i]);
75                 }
76                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
77         }
78         return rc;
79 }
80
81 #define TX_ALLOC_STEP   8
82 static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
83                                   tx_exec_func_t func, tx_exec_func_t undo,
84                                   char *file, int line)
85 {
86         int rc;
87         int i;
88
89         LASSERT(ta != NULL);
90         LASSERT(func != NULL);
91
92         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
93                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
94                 if (rc != 0)
95                         return ERR_PTR(rc);
96         }
97
98         i = ta->ta_argno;
99
100         ta->ta_argno++;
101
102         ta->ta_args[i]->exec_fn = func;
103         ta->ta_args[i]->undo_fn = undo;
104         ta->ta_args[i]->file    = file;
105         ta->ta_args[i]->line    = line;
106
107         return ta->ta_args[i];
108 }
109
110 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
111                             struct dt_object *obj,
112                             struct object_update_reply *reply,
113                             int index)
114 {
115         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
116                dt_obd_name(dt), reply, index, 0);
117
118         object_update_result_insert(reply, NULL, 0, index, 0);
119         return;
120 }
121
122 typedef void (*out_reconstruct_t)(const struct lu_env *env,
123                                   struct dt_device *dt,
124                                   struct dt_object *obj,
125                                   struct object_update_reply *reply,
126                                   int index);
127
128 static inline int out_check_resent(const struct lu_env *env,
129                                    struct dt_device *dt,
130                                    struct dt_object *obj,
131                                    struct ptlrpc_request *req,
132                                    out_reconstruct_t reconstruct,
133                                    struct object_update_reply *reply,
134                                    int index)
135 {
136         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
137                 return 0;
138
139         if (req_xid_is_last(req)) {
140                 reconstruct(env, dt, obj, reply, index);
141                 return 1;
142         }
143         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
144                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
145         return 0;
146 }
147
148 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
149                            struct thandle *th)
150 {
151         int rc;
152
153         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
154                PFID(lu_object_fid(&dt_obj->do_lu)));
155
156         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
157         rc = dt_destroy(env, dt_obj, th);
158         dt_write_unlock(env, dt_obj);
159
160         return rc;
161 }
162
163 /**
164  * All of the xxx_undo will be used once execution failed,
165  * But because all of the required resource has been reserved in
166  * declare phase, i.e. if declare succeed, it should make sure
167  * the following executing phase succeed in anyway, so these undo
168  * should be useless for most of the time in Phase I
169  */
170 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
171                        struct tx_arg *arg)
172 {
173         int rc;
174
175         rc = out_obj_destroy(env, arg->object, th);
176         if (rc != 0)
177                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
178                        dt_obd_name(th->th_dev), rc);
179         return rc;
180 }
181
182 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
183                        struct tx_arg *arg)
184 {
185         struct dt_object        *dt_obj = arg->object;
186         int                      rc;
187
188         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
189                dt_obd_name(th->th_dev),
190                PFID(lu_object_fid(&arg->object->do_lu)),
191                arg->u.create.dof.dof_type,
192                arg->u.create.attr.la_mode & S_IFMT);
193
194         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
195         rc = dt_create(env, dt_obj, &arg->u.create.attr,
196                        &arg->u.create.hint, &arg->u.create.dof, th);
197
198         dt_write_unlock(env, dt_obj);
199
200         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
201                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
202
203         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
204
205         return rc;
206 }
207
208 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
209                            struct lu_attr *attr, struct lu_fid *parent_fid,
210                            struct dt_object_format *dof,
211                            struct thandle_exec_args *ta,
212                            struct object_update_reply *reply,
213                            int index, char *file, int line)
214 {
215         struct tx_arg *arg;
216         int rc;
217
218         LASSERT(ta->ta_handle != NULL);
219         rc = dt_declare_create(env, obj, attr, NULL, dof,
220                                        ta->ta_handle);
221         if (rc != 0)
222                 return rc;
223
224         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
225                           line);
226         if (IS_ERR(arg))
227                 return PTR_ERR(arg);
228
229         /* release the object in out_trans_stop */
230         lu_object_get(&obj->do_lu);
231         arg->object = obj;
232         arg->u.create.attr = *attr;
233         if (parent_fid != NULL)
234                 arg->u.create.fid = *parent_fid;
235         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
236         arg->u.create.dof  = *dof;
237         arg->reply = reply;
238         arg->index = index;
239
240         return 0;
241 }
242
243 static int out_create(struct tgt_session_info *tsi)
244 {
245         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
246         struct object_update    *update = tti->tti_u.update.tti_update;
247         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
248         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
249         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
250         struct lu_attr          *attr = &tti->tti_attr;
251         struct lu_fid           *fid = NULL;
252         struct obdo             *wobdo;
253         int                     size;
254         int                     rc;
255
256         ENTRY;
257
258         wobdo = object_update_param_get(update, 0, &size);
259         if (wobdo == NULL || size != sizeof(*wobdo)) {
260                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
261                        tgt_name(tsi->tsi_tgt), -EPROTO);
262                 RETURN(err_serious(-EPROTO));
263         }
264
265         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
266                 lustre_swab_obdo(wobdo);
267         lustre_get_wire_obdo(NULL, lobdo, wobdo);
268         la_from_obdo(attr, lobdo, lobdo->o_valid);
269
270         dof->dof_type = dt_mode_to_dft(attr->la_mode);
271         if (update->ou_params_count > 1) {
272                 int size;
273
274                 fid = object_update_param_get(update, 1, &size);
275                 if (fid == NULL || size != sizeof(*fid)) {
276                         CERROR("%s: invalid fid: rc = %d\n",
277                                tgt_name(tsi->tsi_tgt), -EPROTO);
278                         RETURN(err_serious(-EPROTO));
279                 }
280                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
281                         lustre_swab_lu_fid(fid);
282                 if (!fid_is_sane(fid)) {
283                         CERROR("%s: invalid fid "DFID": rc = %d\n",
284                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
285                         RETURN(err_serious(-EPROTO));
286                 }
287         }
288
289         if (lu_object_exists(&obj->do_lu))
290                 RETURN(-EEXIST);
291
292         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
293                            &tti->tti_tea,
294                            tti->tti_u.update.tti_update_reply,
295                            tti->tti_u.update.tti_update_reply_index);
296
297         RETURN(rc);
298 }
299
300 static int out_tx_attr_set_undo(const struct lu_env *env,
301                                 struct thandle *th, struct tx_arg *arg)
302 {
303         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
304                dt_obd_name(th->th_dev),
305                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
306
307         return -ENOTSUPP;
308 }
309
310 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
311                                 struct tx_arg *arg)
312 {
313         struct dt_object        *dt_obj = arg->object;
314         int                     rc;
315
316         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
317                PFID(lu_object_fid(&dt_obj->do_lu)));
318
319         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
320         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
321         dt_write_unlock(env, dt_obj);
322
323         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
324                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
325
326         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
327
328         return rc;
329 }
330
331 static int __out_tx_attr_set(const struct lu_env *env,
332                              struct dt_object *dt_obj,
333                              const struct lu_attr *attr,
334                              struct thandle_exec_args *th,
335                              struct object_update_reply *reply,
336                              int index, char *file, int line)
337 {
338         struct tx_arg   *arg;
339         int             rc;
340
341         LASSERT(th->ta_handle != NULL);
342         rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
343         if (rc != 0)
344                 return rc;
345
346         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
347                           file, line);
348         if (IS_ERR(arg))
349                 return PTR_ERR(arg);
350
351         lu_object_get(&dt_obj->do_lu);
352         arg->object = dt_obj;
353         arg->u.attr_set.attr = *attr;
354         arg->reply = reply;
355         arg->index = index;
356         return 0;
357 }
358
359 static int out_attr_set(struct tgt_session_info *tsi)
360 {
361         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
362         struct object_update    *update = tti->tti_u.update.tti_update;
363         struct lu_attr          *attr = &tti->tti_attr;
364         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
365         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
366         struct obdo             *wobdo;
367         int                      size;
368         int                      rc;
369
370         ENTRY;
371
372         wobdo = object_update_param_get(update, 0, &size);
373         if (wobdo == NULL || size != sizeof(*wobdo)) {
374                 CERROR("%s: empty obdo in the update: rc = %d\n",
375                        tgt_name(tsi->tsi_tgt), -EPROTO);
376                 RETURN(err_serious(-EPROTO));
377         }
378
379         attr->la_valid = 0;
380         attr->la_valid = 0;
381
382         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
383                 lustre_swab_obdo(wobdo);
384         lustre_get_wire_obdo(NULL, lobdo, wobdo);
385         la_from_obdo(attr, lobdo, lobdo->o_valid);
386
387         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
388                              tti->tti_u.update.tti_update_reply,
389                              tti->tti_u.update.tti_update_reply_index);
390
391         RETURN(rc);
392 }
393
394 static int out_attr_get(struct tgt_session_info *tsi)
395 {
396         const struct lu_env     *env = tsi->tsi_env;
397         struct tgt_thread_info  *tti = tgt_th_info(env);
398         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
399         struct lu_attr          *la = &tti->tti_attr;
400         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
401         int                     idx = tti->tti_u.update.tti_update_reply_index;
402         int                     rc;
403
404         ENTRY;
405
406         if (!lu_object_exists(&obj->do_lu)) {
407                 /* Usually, this will be called when the master MDT try
408                  * to init a remote object(see osp_object_init), so if
409                  * the object does not exist on slave, we need set BANSHEE flag,
410                  * so the object can be removed from the cache immediately */
411                 set_bit(LU_OBJECT_HEARD_BANSHEE,
412                         &obj->do_lu.lo_header->loh_flags);
413                 RETURN(-ENOENT);
414         }
415
416         dt_read_lock(env, obj, MOR_TGT_CHILD);
417         rc = dt_attr_get(env, obj, la, NULL);
418         if (rc)
419                 GOTO(out_unlock, rc);
420         /*
421          * If it is a directory, we will also check whether the
422          * directory is empty.
423          * la_flags = 0 : Empty.
424          *          = 1 : Not empty.
425          */
426         la->la_flags = 0;
427         if (S_ISDIR(la->la_mode)) {
428                 struct dt_it            *it;
429                 const struct dt_it_ops  *iops;
430
431                 if (!dt_try_as_dir(env, obj))
432                         GOTO(out_unlock, rc = -ENOTDIR);
433
434                 iops = &obj->do_index_ops->dio_it;
435                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
436                 if (!IS_ERR(it)) {
437                         int  result;
438                         result = iops->get(env, it, (const void *)"");
439                         if (result > 0) {
440                                 int i;
441                                 for (result = 0, i = 0; result == 0 && i < 3;
442                                      ++i)
443                                         result = iops->next(env, it);
444                                 if (result == 0)
445                                         la->la_flags = 1;
446                         } else if (result == 0)
447                                 /*
448                                  * Huh? Index contains no zero key?
449                                  */
450                                 rc = -EIO;
451
452                         iops->put(env, it);
453                         iops->fini(env, it);
454                 }
455         }
456
457         obdo->o_valid = 0;
458         obdo_from_la(obdo, la, la->la_valid);
459         lustre_set_wire_obdo(NULL, obdo, obdo);
460
461 out_unlock:
462         dt_read_unlock(env, obj);
463
464         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
465                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
466                0, rc);
467
468         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
469                                     sizeof(*obdo), idx, rc);
470
471         RETURN(rc);
472 }
473
474 static int out_xattr_get(struct tgt_session_info *tsi)
475 {
476         const struct lu_env        *env = tsi->tsi_env;
477         struct tgt_thread_info     *tti = tgt_th_info(env);
478         struct object_update       *update = tti->tti_u.update.tti_update;
479         struct lu_buf              *lbuf = &tti->tti_buf;
480         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
481         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
482         char                       *name;
483         struct object_update_result *update_result;
484         int                     idx = tti->tti_u.update.tti_update_reply_index;
485         int                        rc;
486
487         ENTRY;
488
489         if (!lu_object_exists(&obj->do_lu)) {
490                 set_bit(LU_OBJECT_HEARD_BANSHEE,
491                         &obj->do_lu.lo_header->loh_flags);
492                 RETURN(-ENOENT);
493         }
494
495         name = object_update_param_get(update, 0, NULL);
496         if (name == NULL) {
497                 CERROR("%s: empty name for xattr get: rc = %d\n",
498                        tgt_name(tsi->tsi_tgt), -EPROTO);
499                 RETURN(err_serious(-EPROTO));
500         }
501
502         update_result = object_update_result_get(reply, 0, NULL);
503         if (update_result == NULL) {
504                 CERROR("%s: empty name for xattr get: rc = %d\n",
505                        tgt_name(tsi->tsi_tgt), -EPROTO);
506                 RETURN(err_serious(-EPROTO));
507         }
508
509         lbuf->lb_buf = update_result->our_data;
510         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
511                        cfs_size_round((unsigned long)update_result->our_data -
512                                       (unsigned long)update_result);
513         dt_read_lock(env, obj, MOR_TGT_CHILD);
514         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
515         dt_read_unlock(env, obj);
516         if (rc < 0) {
517                 lbuf->lb_len = 0;
518                 GOTO(out, rc);
519         }
520         if (rc == 0) {
521                 lbuf->lb_len = 0;
522                 GOTO(out, rc = -ENOENT);
523         }
524         lbuf->lb_len = rc;
525         rc = 0;
526         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
527                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
528                name, (int)lbuf->lb_len);
529
530         GOTO(out, rc);
531
532 out:
533         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
534         RETURN(rc);
535 }
536
537 static int out_index_lookup(struct tgt_session_info *tsi)
538 {
539         const struct lu_env     *env = tsi->tsi_env;
540         struct tgt_thread_info  *tti = tgt_th_info(env);
541         struct object_update    *update = tti->tti_u.update.tti_update;
542         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
543         char                    *name;
544         int                      rc;
545
546         ENTRY;
547
548         if (!lu_object_exists(&obj->do_lu))
549                 RETURN(-ENOENT);
550
551         name = object_update_param_get(update, 0, NULL);
552         if (name == NULL) {
553                 CERROR("%s: empty name for lookup: rc = %d\n",
554                        tgt_name(tsi->tsi_tgt), -EPROTO);
555                 RETURN(err_serious(-EPROTO));
556         }
557
558         dt_read_lock(env, obj, MOR_TGT_CHILD);
559         if (!dt_try_as_dir(env, obj))
560                 GOTO(out_unlock, rc = -ENOTDIR);
561
562         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
563                 (struct dt_key *)name, NULL);
564
565         if (rc < 0)
566                 GOTO(out_unlock, rc);
567
568         if (rc == 0)
569                 rc += 1;
570
571 out_unlock:
572         dt_read_unlock(env, obj);
573
574         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
575                PFID(lu_object_fid(&obj->do_lu)), name,
576                PFID(&tti->tti_fid1), rc);
577
578         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
579                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
580                0, rc);
581
582         object_update_result_insert(tti->tti_u.update.tti_update_reply,
583                             &tti->tti_fid1, sizeof(tti->tti_fid1),
584                             tti->tti_u.update.tti_update_reply_index, rc);
585         RETURN(rc);
586 }
587
588 static int out_tx_xattr_set_exec(const struct lu_env *env,
589                                  struct thandle *th,
590                                  struct tx_arg *arg)
591 {
592         struct dt_object *dt_obj = arg->object;
593         int rc;
594
595         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
596                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
597                arg->u.xattr_set.name, arg->u.xattr_set.flags);
598
599         if (!lu_object_exists(&dt_obj->do_lu))
600                 GOTO(out, rc = -ENOENT);
601
602         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
603         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
604                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
605                           th, NULL);
606         dt_write_unlock(env, dt_obj);
607         /**
608          * Ignore errors if this is LINK EA
609          **/
610         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
611                 rc = 0;
612 out:
613         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
614                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
615
616         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
617
618         return rc;
619 }
620
621 static int __out_tx_xattr_set(const struct lu_env *env,
622                               struct dt_object *dt_obj,
623                               const struct lu_buf *buf,
624                               const char *name, int flags,
625                               struct thandle_exec_args *ta,
626                               struct object_update_reply *reply,
627                               int index, char *file, int line)
628 {
629         struct tx_arg   *arg;
630         int             rc;
631
632         LASSERT(ta->ta_handle != NULL);
633         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle);
634         if (rc != 0)
635                 return rc;
636
637         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
638         if (IS_ERR(arg))
639                 return PTR_ERR(arg);
640
641         lu_object_get(&dt_obj->do_lu);
642         arg->object = dt_obj;
643         arg->u.xattr_set.name = name;
644         arg->u.xattr_set.flags = flags;
645         arg->u.xattr_set.buf = *buf;
646         arg->reply = reply;
647         arg->index = index;
648         arg->u.xattr_set.csum = 0;
649         return 0;
650 }
651
652 static int out_xattr_set(struct tgt_session_info *tsi)
653 {
654         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
655         struct object_update    *update = tti->tti_u.update.tti_update;
656         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
657         struct lu_buf           *lbuf = &tti->tti_buf;
658         char                    *name;
659         char                    *buf;
660         __u32                   *tmp;
661         int                      buf_len = 0;
662         int                      flag;
663         int                      size = 0;
664         int                      rc;
665         ENTRY;
666
667         name = object_update_param_get(update, 0, NULL);
668         if (name == NULL) {
669                 CERROR("%s: empty name for xattr set: rc = %d\n",
670                        tgt_name(tsi->tsi_tgt), -EPROTO);
671                 RETURN(err_serious(-EPROTO));
672         }
673
674         buf = object_update_param_get(update, 1, &buf_len);
675         if (buf == NULL || buf_len == 0) {
676                 CERROR("%s: empty buf for xattr set: rc = %d\n",
677                        tgt_name(tsi->tsi_tgt), -EPROTO);
678                 RETURN(err_serious(-EPROTO));
679         }
680
681         lbuf->lb_buf = buf;
682         lbuf->lb_len = buf_len;
683
684         tmp = object_update_param_get(update, 2, &size);
685         if (tmp == NULL || size != sizeof(*tmp)) {
686                 CERROR("%s: emptry or wrong size %d flag: rc = %d\n",
687                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
688                 RETURN(err_serious(-EPROTO));
689         }
690
691         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
692                 __swab32s(tmp);
693         flag = *tmp;
694
695         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
696                               &tti->tti_tea,
697                               tti->tti_u.update.tti_update_reply,
698                               tti->tti_u.update.tti_update_reply_index);
699         RETURN(rc);
700 }
701
702 static int out_obj_ref_add(const struct lu_env *env,
703                            struct dt_object *dt_obj,
704                            struct thandle *th)
705 {
706         int rc;
707
708         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
709         rc = dt_ref_add(env, dt_obj, th);
710         dt_write_unlock(env, dt_obj);
711
712         return rc;
713 }
714
715 static int out_obj_ref_del(const struct lu_env *env,
716                            struct dt_object *dt_obj,
717                            struct thandle *th)
718 {
719         int rc;
720
721         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
722         rc = dt_ref_del(env, dt_obj, th);
723         dt_write_unlock(env, dt_obj);
724
725         return rc;
726 }
727
728 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
729                                struct tx_arg *arg)
730 {
731         struct dt_object *dt_obj = arg->object;
732         int rc;
733
734         rc = out_obj_ref_add(env, dt_obj, th);
735
736         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
737                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
738
739         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
740         return rc;
741 }
742
743 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
744                                struct tx_arg *arg)
745 {
746         return out_obj_ref_del(env, arg->object, th);
747 }
748
749 static int __out_tx_ref_add(const struct lu_env *env,
750                             struct dt_object *dt_obj,
751                             struct thandle_exec_args *ta,
752                             struct object_update_reply *reply,
753                             int index, char *file, int line)
754 {
755         struct tx_arg   *arg;
756         int             rc;
757
758         LASSERT(ta->ta_handle != NULL);
759         rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
760         if (rc != 0)
761                 return rc;
762
763         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
764                           line);
765         if (IS_ERR(arg))
766                 return PTR_ERR(arg);
767
768         lu_object_get(&dt_obj->do_lu);
769         arg->object = dt_obj;
770         arg->reply = reply;
771         arg->index = index;
772         return 0;
773 }
774
775 /**
776  * increase ref of the object
777  **/
778 static int out_ref_add(struct tgt_session_info *tsi)
779 {
780         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
781         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
782         int                      rc;
783
784         ENTRY;
785
786         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
787                             tti->tti_u.update.tti_update_reply,
788                             tti->tti_u.update.tti_update_reply_index);
789         RETURN(rc);
790 }
791
792 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
793                                struct tx_arg *arg)
794 {
795         struct dt_object        *dt_obj = arg->object;
796         int                      rc;
797
798         rc = out_obj_ref_del(env, dt_obj, th);
799
800         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
801                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
802
803         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
804
805         return rc;
806 }
807
808 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
809                                struct tx_arg *arg)
810 {
811         return out_obj_ref_add(env, arg->object, th);
812 }
813
814 static int __out_tx_ref_del(const struct lu_env *env,
815                             struct dt_object *dt_obj,
816                             struct thandle_exec_args *ta,
817                             struct object_update_reply *reply,
818                             int index, char *file, int line)
819 {
820         struct tx_arg   *arg;
821         int             rc;
822
823         LASSERT(ta->ta_handle != NULL);
824         rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
825         if (rc != 0)
826                 return rc;
827
828         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
829                           line);
830         if (IS_ERR(arg))
831                 return PTR_ERR(arg);
832
833         lu_object_get(&dt_obj->do_lu);
834         arg->object = dt_obj;
835         arg->reply = reply;
836         arg->index = index;
837         return 0;
838 }
839
840 static int out_ref_del(struct tgt_session_info *tsi)
841 {
842         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
843         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
844         int                      rc;
845
846         ENTRY;
847
848         if (!lu_object_exists(&obj->do_lu))
849                 RETURN(-ENOENT);
850
851         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
852                             tti->tti_u.update.tti_update_reply,
853                             tti->tti_u.update.tti_update_reply_index);
854         RETURN(rc);
855 }
856
857 static int out_obj_index_insert(const struct lu_env *env,
858                                 struct dt_object *dt_obj,
859                                 const struct dt_rec *rec,
860                                 const struct dt_key *key,
861                                 struct thandle *th)
862 {
863         int rc;
864
865         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
866                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
867                (char *)key, PFID((struct lu_fid *)rec));
868
869         if (dt_try_as_dir(env, dt_obj) == 0)
870                 return -ENOTDIR;
871
872         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
873         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
874         dt_write_unlock(env, dt_obj);
875
876         return rc;
877 }
878
879 static int out_obj_index_delete(const struct lu_env *env,
880                                 struct dt_object *dt_obj,
881                                 const struct dt_key *key,
882                                 struct thandle *th)
883 {
884         int rc;
885
886         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
887                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
888                (char *)key);
889
890         if (dt_try_as_dir(env, dt_obj) == 0)
891                 return -ENOTDIR;
892
893         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
894         rc = dt_delete(env, dt_obj, key, th, NULL);
895         dt_write_unlock(env, dt_obj);
896
897         return rc;
898 }
899
900 static int out_tx_index_insert_exec(const struct lu_env *env,
901                                     struct thandle *th, struct tx_arg *arg)
902 {
903         struct dt_object *dt_obj = arg->object;
904         int rc;
905
906         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
907                                   arg->u.insert.key, th);
908
909         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
910                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
911
912         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
913
914         return rc;
915 }
916
917 static int out_tx_index_insert_undo(const struct lu_env *env,
918                                     struct thandle *th, struct tx_arg *arg)
919 {
920         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
921 }
922
923 static int __out_tx_index_insert(const struct lu_env *env,
924                                  struct dt_object *dt_obj,
925                                  char *name, struct lu_fid *fid,
926                                  struct thandle_exec_args *ta,
927                                  struct object_update_reply *reply,
928                                  int index, char *file, int line)
929 {
930         struct tx_arg   *arg;
931         int             rc;
932
933         LASSERT(ta->ta_handle != NULL);
934         if (dt_try_as_dir(env, dt_obj) == 0) {
935                 rc = -ENOTDIR;
936                 return rc;
937         }
938
939         rc = dt_declare_insert(env, dt_obj, (struct dt_rec *)fid,
940                                (struct dt_key *)name, ta->ta_handle);
941         if (rc != 0)
942                 return rc;
943
944         arg = tx_add_exec(ta, out_tx_index_insert_exec,
945                           out_tx_index_insert_undo, file, line);
946         if (IS_ERR(arg))
947                 return PTR_ERR(arg);
948
949         lu_object_get(&dt_obj->do_lu);
950         arg->object = dt_obj;
951         arg->reply = reply;
952         arg->index = index;
953         arg->u.insert.rec = (struct dt_rec *)fid;
954         arg->u.insert.key = (struct dt_key *)name;
955
956         return 0;
957 }
958
959 static int out_index_insert(struct tgt_session_info *tsi)
960 {
961         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
962         struct object_update    *update = tti->tti_u.update.tti_update;
963         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
964         struct lu_fid     *fid;
965         char              *name;
966         int                rc = 0;
967         int                size;
968
969         ENTRY;
970
971         name = object_update_param_get(update, 0, NULL);
972         if (name == NULL) {
973                 CERROR("%s: empty name for index insert: rc = %d\n",
974                        tgt_name(tsi->tsi_tgt), -EPROTO);
975                 RETURN(err_serious(-EPROTO));
976         }
977
978         fid = object_update_param_get(update, 1, &size);
979         if (fid == NULL || size != sizeof(*fid)) {
980                 CERROR("%s: invalid fid: rc = %d\n",
981                        tgt_name(tsi->tsi_tgt), -EPROTO);
982                        RETURN(err_serious(-EPROTO));
983         }
984
985         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
986                 lustre_swab_lu_fid(fid);
987
988         if (!fid_is_sane(fid)) {
989                 CERROR("%s: invalid FID "DFID": rc = %d\n",
990                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
991                 RETURN(err_serious(-EPROTO));
992         }
993
994         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
995                                  &tti->tti_tea,
996                                  tti->tti_u.update.tti_update_reply,
997                                  tti->tti_u.update.tti_update_reply_index);
998         RETURN(rc);
999 }
1000
1001 static int out_tx_index_delete_exec(const struct lu_env *env,
1002                                     struct thandle *th,
1003                                     struct tx_arg *arg)
1004 {
1005         int rc;
1006
1007         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1008
1009         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1010                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1011
1012         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1013
1014         return rc;
1015 }
1016
1017 static int out_tx_index_delete_undo(const struct lu_env *env,
1018                                     struct thandle *th,
1019                                     struct tx_arg *arg)
1020 {
1021         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1022                dt_obd_name(th->th_dev), -ENOTSUPP);
1023         return -ENOTSUPP;
1024 }
1025
1026 static int __out_tx_index_delete(const struct lu_env *env,
1027                                  struct dt_object *dt_obj, char *name,
1028                                  struct thandle_exec_args *ta,
1029                                  struct object_update_reply *reply,
1030                                  int index, char *file, int line)
1031 {
1032         struct tx_arg   *arg;
1033         int             rc;
1034
1035         if (dt_try_as_dir(env, dt_obj) == 0) {
1036                 rc = -ENOTDIR;
1037                 return rc;
1038         }
1039
1040         LASSERT(ta->ta_handle != NULL);
1041         rc = dt_declare_delete(env, dt_obj, (struct dt_key *)name,
1042                                ta->ta_handle);
1043         if (rc != 0)
1044                 return rc;
1045
1046         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1047                           out_tx_index_delete_undo, file, line);
1048         if (IS_ERR(arg))
1049                 return PTR_ERR(arg);
1050
1051         lu_object_get(&dt_obj->do_lu);
1052         arg->object = dt_obj;
1053         arg->reply = reply;
1054         arg->index = index;
1055         arg->u.insert.key = (struct dt_key *)name;
1056         return 0;
1057 }
1058
1059 static int out_index_delete(struct tgt_session_info *tsi)
1060 {
1061         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1062         struct object_update    *update = tti->tti_u.update.tti_update;
1063         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1064         char                    *name;
1065         int                      rc = 0;
1066
1067         if (!lu_object_exists(&obj->do_lu))
1068                 RETURN(-ENOENT);
1069
1070         name = object_update_param_get(update, 0, NULL);
1071         if (name == NULL) {
1072                 CERROR("%s: empty name for index delete: rc = %d\n",
1073                        tgt_name(tsi->tsi_tgt), -EPROTO);
1074                 RETURN(err_serious(-EPROTO));
1075         }
1076
1077         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1078                                  tti->tti_u.update.tti_update_reply,
1079                                  tti->tti_u.update.tti_update_reply_index);
1080         RETURN(rc);
1081 }
1082
1083 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1084                                struct tx_arg *arg)
1085 {
1086         struct dt_object *dt_obj = arg->object;
1087         int rc;
1088
1089         rc = out_obj_destroy(env, dt_obj, th);
1090
1091         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1092                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1093
1094         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1095
1096         RETURN(rc);
1097 }
1098
1099 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1100                                struct tx_arg *arg)
1101 {
1102         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1103                dt_obd_name(th->th_dev), -ENOTSUPP);
1104         return -ENOTSUPP;
1105 }
1106
1107 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1108                              struct thandle_exec_args *ta,
1109                              struct object_update_reply *reply,
1110                              int index, char *file, int line)
1111 {
1112         struct tx_arg   *arg;
1113         int             rc;
1114
1115         LASSERT(ta->ta_handle != NULL);
1116         rc = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1117         if (rc != 0)
1118                 return rc;
1119
1120         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1121                           file, line);
1122         if (IS_ERR(arg))
1123                 return PTR_ERR(arg);
1124
1125         lu_object_get(&dt_obj->do_lu);
1126         arg->object = dt_obj;
1127         arg->reply = reply;
1128         arg->index = index;
1129         return 0;
1130 }
1131
1132 static int out_destroy(struct tgt_session_info *tsi)
1133 {
1134         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1135         struct object_update    *update = tti->tti_u.update.tti_update;
1136         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1137         struct lu_fid           *fid;
1138         int                      rc;
1139         ENTRY;
1140
1141         fid = &update->ou_fid;
1142         if (!fid_is_sane(fid)) {
1143                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1144                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1145                 RETURN(err_serious(-EPROTO));
1146         }
1147
1148         if (!lu_object_exists(&obj->do_lu))
1149                 RETURN(-ENOENT);
1150
1151         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1152                             tti->tti_u.update.tti_update_reply,
1153                             tti->tti_u.update.tti_update_reply_index);
1154
1155         RETURN(rc);
1156 }
1157
1158 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1159                              struct tx_arg *arg)
1160 {
1161         struct dt_object *dt_obj = arg->object;
1162         int rc;
1163
1164         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1165         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1166                              &arg->u.write.pos, th);
1167         dt_write_unlock(env, dt_obj);
1168
1169         if (rc == 0)
1170                 rc = arg->u.write.buf.lb_len;
1171
1172         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1173
1174         return rc > 0 ? 0 : rc;
1175 }
1176
1177 static int __out_tx_write(const struct lu_env *env,
1178                           struct dt_object *dt_obj,
1179                           const struct lu_buf *buf,
1180                           loff_t pos, struct thandle_exec_args *ta,
1181                           struct object_update_reply *reply,
1182                           int index, char *file, int line)
1183 {
1184         struct tx_arg   *arg;
1185         int             rc;
1186
1187         LASSERT(ta->ta_handle != NULL);
1188         rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle);
1189         if (rc != 0)
1190                 return rc;
1191
1192         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1193         if (IS_ERR(arg))
1194                 return PTR_ERR(arg);
1195
1196         lu_object_get(&dt_obj->do_lu);
1197         arg->object = dt_obj;
1198         arg->u.write.buf = *buf;
1199         arg->u.write.pos = pos;
1200         arg->reply = reply;
1201         arg->index = index;
1202         return 0;
1203 }
1204
1205 static int out_write(struct tgt_session_info *tsi)
1206 {
1207         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1208         struct object_update    *update = tti->tti_u.update.tti_update;
1209         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1210         struct lu_buf           *lbuf = &tti->tti_buf;
1211         char                    *buf;
1212         __u64                   *tmp;
1213         int                     size = 0;
1214         int                     buf_len = 0;
1215         loff_t                  pos;
1216         int                      rc;
1217         ENTRY;
1218
1219         buf = object_update_param_get(update, 0, &buf_len);
1220         if (buf == NULL || buf_len == 0) {
1221                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1222                        tgt_name(tsi->tsi_tgt), -EPROTO);
1223                 RETURN(err_serious(-EPROTO));
1224         }
1225         lbuf->lb_buf = buf;
1226         lbuf->lb_len = buf_len;
1227
1228         tmp = object_update_param_get(update, 1, &size);
1229         if (tmp == NULL || size != sizeof(*tmp)) {
1230                 CERROR("%s: empty or wrong size %d pos: rc = %d\n",
1231                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
1232                 RETURN(err_serious(-EPROTO));
1233         }
1234
1235         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1236                 __swab64s(tmp);
1237         pos = *tmp;
1238
1239         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1240                           &tti->tti_tea,
1241                           tti->tti_u.update.tti_update_reply,
1242                           tti->tti_u.update.tti_update_reply_index);
1243         RETURN(rc);
1244 }
1245
1246 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1247 [opc - OUT_CREATE] = {                                  \
1248         .th_name    = name,                             \
1249         .th_fail_id = 0,                                \
1250         .th_opc     = opc,                              \
1251         .th_flags   = flags,                            \
1252         .th_act     = fn,                               \
1253         .th_fmt     = NULL,                             \
1254         .th_version = 0,                                \
1255 }
1256
1257 static struct tgt_handler out_update_ops[] = {
1258         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1259                      out_create),
1260         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1261                      out_destroy),
1262         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1263                      out_ref_add),
1264         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1265                      out_ref_del),
1266         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1267                      out_attr_set),
1268         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1269                      out_attr_get),
1270         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1271                      out_xattr_set),
1272         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1273                      out_xattr_get),
1274         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1275                      out_index_lookup),
1276         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1277                      MUTABOR | HABEO_REFERO, out_index_insert),
1278         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1279                      MUTABOR | HABEO_REFERO, out_index_delete),
1280         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1281 };
1282
1283 struct tgt_handler *out_handler_find(__u32 opc)
1284 {
1285         struct tgt_handler *h;
1286
1287         h = NULL;
1288         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1289                 h = &out_update_ops[opc - OUT_CREATE];
1290                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1291                          h->th_opc, opc);
1292         } else {
1293                 h = NULL; /* unsupported opc */
1294         }
1295         return h;
1296 }
1297
1298 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1299                         struct thandle_exec_args *ta, struct obd_export *exp)
1300 {
1301         ta->ta_argno = 0;
1302         ta->ta_handle = dt_trans_create(env, dt);
1303         if (IS_ERR(ta->ta_handle)) {
1304                 int rc;
1305
1306                 rc = PTR_ERR(ta->ta_handle);
1307                 ta->ta_handle = NULL;
1308                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
1309                        rc);
1310                 return rc;
1311         }
1312         if (exp->exp_need_sync)
1313                 ta->ta_handle->th_sync = 1;
1314
1315         return 0;
1316 }
1317
1318 static int out_trans_start(const struct lu_env *env,
1319                            struct thandle_exec_args *ta)
1320 {
1321         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
1322 }
1323
1324 static int out_trans_stop(const struct lu_env *env,
1325                           struct thandle_exec_args *ta, int err)
1326 {
1327         int i;
1328         int rc;
1329
1330         ta->ta_handle->th_result = err;
1331         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
1332         for (i = 0; i < ta->ta_argno; i++) {
1333                 if (ta->ta_args[i]->object != NULL) {
1334                         struct dt_object *obj = ta->ta_args[i]->object;
1335
1336                         /* If the object is being created during this
1337                          * transaction, we need to remove them from the
1338                          * cache immediately, because a few layers are
1339                          * missing in OUT handler, i.e. the object might
1340                          * not be initialized in all layers */
1341                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
1342                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1343                                         &obj->do_lu.lo_header->loh_flags);
1344                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1345                         ta->ta_args[i]->object = NULL;
1346                 }
1347         }
1348         ta->ta_handle = NULL;
1349         ta->ta_argno = 0;
1350
1351         return rc;
1352 }
1353
1354 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
1355                int declare_ret)
1356 {
1357         struct tgt_session_info *tsi = tgt_ses_info(env);
1358         int                     i;
1359         int                     rc;
1360         int                     rc1;
1361         ENTRY;
1362
1363         if (ta->ta_handle == NULL)
1364                 RETURN(0);
1365
1366         if (declare_ret != 0 || ta->ta_argno == 0)
1367                 GOTO(stop, rc = declare_ret);
1368
1369         LASSERT(ta->ta_handle->th_dev != NULL);
1370         rc = out_trans_start(env, ta);
1371         if (unlikely(rc != 0))
1372                 GOTO(stop, rc);
1373
1374         for (i = 0; i < ta->ta_argno; i++) {
1375                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
1376                                              ta->ta_args[i]);
1377                 if (unlikely(rc != 0)) {
1378                         CDEBUG(D_INFO, "error during execution of #%u from"
1379                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1380                                ta->ta_args[i]->line, rc);
1381                         while (--i >= 0) {
1382                                 if (ta->ta_args[i]->undo_fn != NULL)
1383                                         ta->ta_args[i]->undo_fn(env,
1384                                                                ta->ta_handle,
1385                                                                ta->ta_args[i]);
1386                                 else
1387                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1388                                              dt_obd_name(ta->ta_handle->th_dev),
1389                                                ta->ta_args[i]->file,
1390                                                ta->ta_args[i]->line, -ENOTSUPP);
1391                         }
1392                         break;
1393                 }
1394                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1395                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
1396         }
1397
1398         /* Only fail for real update */
1399         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1400 stop:
1401         rc1 = out_trans_stop(env, ta, rc);
1402         if (rc == 0)
1403                 rc = rc1;
1404
1405         ta->ta_handle = NULL;
1406         ta->ta_argno = 0;
1407
1408         RETURN(rc);
1409 }
1410
1411 /**
1412  * Object updates between Targets. Because all the updates has been
1413  * dis-assemblied into object updates at sender side, so OUT will
1414  * call OSD API directly to execute these updates.
1415  *
1416  * In DNE phase I all of the updates in the request need to be executed
1417  * in one transaction, and the transaction has to be synchronously.
1418  *
1419  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1420  * format.
1421  */
1422 int out_handle(struct tgt_session_info *tsi)
1423 {
1424         const struct lu_env             *env = tsi->tsi_env;
1425         struct tgt_thread_info          *tti = tgt_th_info(env);
1426         struct thandle_exec_args        *ta = &tti->tti_tea;
1427         struct req_capsule              *pill = tsi->tsi_pill;
1428         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1429         struct object_update_request    *ureq;
1430         struct object_update            *update;
1431         struct object_update_reply      *reply;
1432         int                              bufsize;
1433         int                              count;
1434         int                              current_batchid = -1;
1435         int                              i;
1436         int                              rc = 0;
1437         int                              rc1 = 0;
1438
1439         ENTRY;
1440
1441         req_capsule_set(pill, &RQF_OUT_UPDATE);
1442         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1443         if (ureq == NULL) {
1444                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1445                        -EPROTO);
1446                 RETURN(err_serious(-EPROTO));
1447         }
1448
1449         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1450         if (bufsize != object_update_request_size(ureq)) {
1451                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1452                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1453                 RETURN(err_serious(-EPROTO));
1454         }
1455
1456         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1457                 CERROR("%s: invalid update buffer magic %x expect %x: "
1458                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1459                        UPDATE_REQUEST_MAGIC, -EPROTO);
1460                 RETURN(err_serious(-EPROTO));
1461         }
1462
1463         count = ureq->ourq_count;
1464         if (count <= 0) {
1465                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1466                        -EPROTO);
1467                 RETURN(err_serious(-EPROTO));
1468         }
1469
1470         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1471                              OUT_UPDATE_REPLY_SIZE);
1472         rc = req_capsule_server_pack(pill);
1473         if (rc != 0) {
1474                 CERROR("%s: Can't pack response: rc = %d\n",
1475                        tgt_name(tsi->tsi_tgt), rc);
1476                 RETURN(rc);
1477         }
1478
1479         /* Prepare the update reply buffer */
1480         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1481         if (reply == NULL)
1482                 RETURN(err_serious(-EPROTO));
1483         object_update_reply_init(reply, count);
1484         tti->tti_u.update.tti_update_reply = reply;
1485         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1486
1487         /* Walk through updates in the request to execute them synchronously */
1488         for (i = 0; i < count; i++) {
1489                 struct tgt_handler      *h;
1490                 struct dt_object        *dt_obj;
1491
1492                 update = object_update_request_get(ureq, i, NULL);
1493                 if (update == NULL)
1494                         GOTO(out, rc = -EPROTO);
1495
1496                 if (ptlrpc_req_need_swab(pill->rc_req))
1497                         lustre_swab_object_update(update);
1498
1499                 if (!fid_is_sane(&update->ou_fid)) {
1500                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1501                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1502                                -EPROTO);
1503                         GOTO(out, rc = err_serious(-EPROTO));
1504                 }
1505
1506                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1507                 if (IS_ERR(dt_obj))
1508                         GOTO(out, rc = PTR_ERR(dt_obj));
1509
1510                 if (dt->dd_record_fid_accessed) {
1511                         lfsck_pack_rfa(&tti->tti_lr,
1512                                        lu_object_fid(&dt_obj->do_lu));
1513                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1514                 }
1515
1516                 tti->tti_u.update.tti_dt_object = dt_obj;
1517                 tti->tti_u.update.tti_update = update;
1518                 tti->tti_u.update.tti_update_reply_index = i;
1519
1520                 h = out_handler_find(update->ou_type);
1521                 if (unlikely(h == NULL)) {
1522                         CERROR("%s: unsupported opc: 0x%x\n",
1523                                tgt_name(tsi->tsi_tgt), update->ou_type);
1524                         GOTO(next, rc = -ENOTSUPP);
1525                 }
1526
1527                 /* Check resend case only for modifying RPC */
1528                 if (h->th_flags & MUTABOR) {
1529                         struct ptlrpc_request *req = tgt_ses_req(tsi);
1530
1531                         if (out_check_resent(env, dt, dt_obj, req,
1532                                              out_reconstruct, reply, i))
1533                                 GOTO(next, rc = 0);
1534                 }
1535
1536                 /* start transaction for modification RPC only */
1537                 if (h->th_flags & MUTABOR && current_batchid == -1) {
1538                         current_batchid = update->ou_batchid;
1539                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1540                         if (rc != 0)
1541                                 GOTO(next, rc);
1542                 }
1543
1544                 /* Stop the current update transaction, if the update has
1545                  * different batchid, or read-only update */
1546                 if (((current_batchid != update->ou_batchid) ||
1547                      !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
1548                         rc = out_tx_end(env, ta, rc);
1549                         current_batchid = -1;
1550                         if (rc != 0)
1551                                 GOTO(next, rc);
1552
1553                         /* start a new transaction if needed */
1554                         if (h->th_flags & MUTABOR) {
1555                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1556                                 if (rc != 0)
1557                                         GOTO(next, rc);
1558
1559                                 current_batchid = update->ou_batchid;
1560                         }
1561                 }
1562
1563                 rc = h->th_act(tsi);
1564 next:
1565                 lu_object_put(env, &dt_obj->do_lu);
1566                 if (rc < 0)
1567                         GOTO(out, rc);
1568         }
1569 out:
1570         if (current_batchid != -1) {
1571                 rc1 = out_tx_end(env, ta, rc);
1572                 if (rc == 0)
1573                         rc = rc1;
1574         }
1575
1576         RETURN(rc);
1577 }
1578
1579 struct tgt_handler tgt_out_handlers[] = {
1580 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1581 };
1582 EXPORT_SYMBOL(tgt_out_handlers);
1583