Whamcloud - gitweb
74e8f47c9c6918de742bfca5b6cf25455708e2c9
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48                  "Too many updates(%d) in one trans\n", ta->ta_argno);
49         i = ta->ta_argno;
50
51         ta->ta_argno++;
52
53         ta->ta_args[i].exec_fn = func;
54         ta->ta_args[i].undo_fn = undo;
55         ta->ta_args[i].file    = file;
56         ta->ta_args[i].line    = line;
57
58         return &ta->ta_args[i];
59 }
60
61 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
62                             struct dt_object *obj,
63                             struct object_update_reply *reply,
64                             int index)
65 {
66         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
67                dt_obd_name(dt), reply, index, 0);
68
69         object_update_result_insert(reply, NULL, 0, index, 0);
70         return;
71 }
72
73 typedef void (*out_reconstruct_t)(const struct lu_env *env,
74                                   struct dt_device *dt,
75                                   struct dt_object *obj,
76                                   struct object_update_reply *reply,
77                                   int index);
78
79 static inline int out_check_resent(const struct lu_env *env,
80                                    struct dt_device *dt,
81                                    struct dt_object *obj,
82                                    struct ptlrpc_request *req,
83                                    out_reconstruct_t reconstruct,
84                                    struct object_update_reply *reply,
85                                    int index)
86 {
87         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
88                 return 0;
89
90         if (req_xid_is_last(req)) {
91                 reconstruct(env, dt, obj, reply, index);
92                 return 1;
93         }
94         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
95                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
96         return 0;
97 }
98
99 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
100                            struct thandle *th)
101 {
102         int rc;
103
104         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
105                PFID(lu_object_fid(&dt_obj->do_lu)));
106
107         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
108         rc = dt_destroy(env, dt_obj, th);
109         dt_write_unlock(env, dt_obj);
110
111         return rc;
112 }
113
114 /**
115  * All of the xxx_undo will be used once execution failed,
116  * But because all of the required resource has been reserved in
117  * declare phase, i.e. if declare succeed, it should make sure
118  * the following executing phase succeed in anyway, so these undo
119  * should be useless for most of the time in Phase I
120  */
121 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
122                        struct tx_arg *arg)
123 {
124         int rc;
125
126         rc = out_obj_destroy(env, arg->object, th);
127         if (rc != 0)
128                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
129                        dt_obd_name(th->th_dev), rc);
130         return rc;
131 }
132
133 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
134                        struct tx_arg *arg)
135 {
136         struct dt_object        *dt_obj = arg->object;
137         int                      rc;
138
139         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
140                dt_obd_name(th->th_dev),
141                PFID(lu_object_fid(&arg->object->do_lu)),
142                arg->u.create.dof.dof_type,
143                arg->u.create.attr.la_mode & S_IFMT);
144
145         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
146         rc = dt_create(env, dt_obj, &arg->u.create.attr,
147                        &arg->u.create.hint, &arg->u.create.dof, th);
148
149         dt_write_unlock(env, dt_obj);
150
151         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
152                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
153
154         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
155
156         return rc;
157 }
158
159 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
160                            struct lu_attr *attr, struct lu_fid *parent_fid,
161                            struct dt_object_format *dof,
162                            struct thandle_exec_args *ta,
163                            struct object_update_reply *reply,
164                            int index, char *file, int line)
165 {
166         struct tx_arg *arg;
167
168         LASSERT(ta->ta_handle != NULL);
169         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
170                                        ta->ta_handle);
171         if (ta->ta_err != 0)
172                 return ta->ta_err;
173
174         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
175                           line);
176         LASSERT(arg);
177
178         /* release the object in out_trans_stop */
179         lu_object_get(&obj->do_lu);
180         arg->object = obj;
181         arg->u.create.attr = *attr;
182         if (parent_fid != NULL)
183                 arg->u.create.fid = *parent_fid;
184         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
185         arg->u.create.dof  = *dof;
186         arg->reply = reply;
187         arg->index = index;
188
189         return 0;
190 }
191
192 static int out_create(struct tgt_session_info *tsi)
193 {
194         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
195         struct object_update    *update = tti->tti_u.update.tti_update;
196         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
197         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
198         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
199         struct lu_attr          *attr = &tti->tti_attr;
200         struct lu_fid           *fid = NULL;
201         struct obdo             *wobdo;
202         int                     size;
203         int                     rc;
204
205         ENTRY;
206
207         wobdo = object_update_param_get(update, 0, &size);
208         if (wobdo == NULL || size != sizeof(*wobdo)) {
209                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
210                        tgt_name(tsi->tsi_tgt), -EPROTO);
211                 RETURN(err_serious(-EPROTO));
212         }
213
214         obdo_le_to_cpu(wobdo, wobdo);
215         lustre_get_wire_obdo(NULL, lobdo, wobdo);
216         la_from_obdo(attr, lobdo, lobdo->o_valid);
217
218         dof->dof_type = dt_mode_to_dft(attr->la_mode);
219         if (update->ou_params_count > 1) {
220                 int size;
221
222                 fid = object_update_param_get(update, 1, &size);
223                 if (fid == NULL || size != sizeof(*fid)) {
224                         CERROR("%s: invalid fid: rc = %d\n",
225                                tgt_name(tsi->tsi_tgt), -EPROTO);
226                         RETURN(err_serious(-EPROTO));
227                 }
228                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
229                         lustre_swab_lu_fid(fid);
230                 if (!fid_is_sane(fid)) {
231                         CERROR("%s: invalid fid "DFID": rc = %d\n",
232                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
233                         RETURN(err_serious(-EPROTO));
234                 }
235         }
236
237         if (lu_object_exists(&obj->do_lu))
238                 RETURN(-EEXIST);
239
240         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
241                            &tti->tti_tea,
242                            tti->tti_u.update.tti_update_reply,
243                            tti->tti_u.update.tti_update_reply_index);
244
245         RETURN(rc);
246 }
247
248 static int out_tx_attr_set_undo(const struct lu_env *env,
249                                 struct thandle *th, struct tx_arg *arg)
250 {
251         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
252                dt_obd_name(th->th_dev),
253                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
254
255         return -ENOTSUPP;
256 }
257
258 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
259                                 struct tx_arg *arg)
260 {
261         struct dt_object        *dt_obj = arg->object;
262         int                     rc;
263
264         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
265                PFID(lu_object_fid(&dt_obj->do_lu)));
266
267         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
268         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
269         dt_write_unlock(env, dt_obj);
270
271         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
272                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
273
274         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
275
276         return rc;
277 }
278
279 static int __out_tx_attr_set(const struct lu_env *env,
280                              struct dt_object *dt_obj,
281                              const struct lu_attr *attr,
282                              struct thandle_exec_args *th,
283                              struct object_update_reply *reply,
284                              int index, char *file, int line)
285 {
286         struct tx_arg           *arg;
287
288         LASSERT(th->ta_handle != NULL);
289         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
290         if (th->ta_err != 0)
291                 return th->ta_err;
292
293         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
294                           file, line);
295         LASSERT(arg);
296         lu_object_get(&dt_obj->do_lu);
297         arg->object = dt_obj;
298         arg->u.attr_set.attr = *attr;
299         arg->reply = reply;
300         arg->index = index;
301         return 0;
302 }
303
304 static int out_attr_set(struct tgt_session_info *tsi)
305 {
306         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
307         struct object_update    *update = tti->tti_u.update.tti_update;
308         struct lu_attr          *attr = &tti->tti_attr;
309         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
310         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
311         struct obdo             *wobdo;
312         int                      size;
313         int                      rc;
314
315         ENTRY;
316
317         wobdo = object_update_param_get(update, 0, &size);
318         if (wobdo == NULL || size != sizeof(*wobdo)) {
319                 CERROR("%s: empty obdo in the update: rc = %d\n",
320                        tgt_name(tsi->tsi_tgt), -EPROTO);
321                 RETURN(err_serious(-EPROTO));
322         }
323
324         attr->la_valid = 0;
325         attr->la_valid = 0;
326         obdo_le_to_cpu(wobdo, wobdo);
327         lustre_get_wire_obdo(NULL, lobdo, wobdo);
328         la_from_obdo(attr, lobdo, lobdo->o_valid);
329
330         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
331                              tti->tti_u.update.tti_update_reply,
332                              tti->tti_u.update.tti_update_reply_index);
333
334         RETURN(rc);
335 }
336
337 static int out_attr_get(struct tgt_session_info *tsi)
338 {
339         const struct lu_env     *env = tsi->tsi_env;
340         struct tgt_thread_info  *tti = tgt_th_info(env);
341         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
342         struct lu_attr          *la = &tti->tti_attr;
343         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
344         int                     idx = tti->tti_u.update.tti_update_reply_index;
345         int                     rc;
346
347         ENTRY;
348
349         if (!lu_object_exists(&obj->do_lu)) {
350                 /* Usually, this will be called when the master MDT try
351                  * to init a remote object(see osp_object_init), so if
352                  * the object does not exist on slave, we need set BANSHEE flag,
353                  * so the object can be removed from the cache immediately */
354                 set_bit(LU_OBJECT_HEARD_BANSHEE,
355                         &obj->do_lu.lo_header->loh_flags);
356                 RETURN(-ENOENT);
357         }
358
359         dt_read_lock(env, obj, MOR_TGT_CHILD);
360         rc = dt_attr_get(env, obj, la, NULL);
361         if (rc)
362                 GOTO(out_unlock, rc);
363         /*
364          * If it is a directory, we will also check whether the
365          * directory is empty.
366          * la_flags = 0 : Empty.
367          *          = 1 : Not empty.
368          */
369         la->la_flags = 0;
370         if (S_ISDIR(la->la_mode)) {
371                 struct dt_it            *it;
372                 const struct dt_it_ops  *iops;
373
374                 if (!dt_try_as_dir(env, obj))
375                         GOTO(out_unlock, rc = -ENOTDIR);
376
377                 iops = &obj->do_index_ops->dio_it;
378                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
379                 if (!IS_ERR(it)) {
380                         int  result;
381                         result = iops->get(env, it, (const void *)"");
382                         if (result > 0) {
383                                 int i;
384                                 for (result = 0, i = 0; result == 0 && i < 3;
385                                      ++i)
386                                         result = iops->next(env, it);
387                                 if (result == 0)
388                                         la->la_flags = 1;
389                         } else if (result == 0)
390                                 /*
391                                  * Huh? Index contains no zero key?
392                                  */
393                                 rc = -EIO;
394
395                         iops->put(env, it);
396                         iops->fini(env, it);
397                 }
398         }
399
400         obdo->o_valid = 0;
401         obdo_from_la(obdo, la, la->la_valid);
402         obdo_cpu_to_le(obdo, obdo);
403         lustre_set_wire_obdo(NULL, obdo, obdo);
404
405 out_unlock:
406         dt_read_unlock(env, obj);
407
408         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
409                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
410                0, rc);
411
412         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
413                                     sizeof(*obdo), idx, rc);
414
415         RETURN(rc);
416 }
417
418 static int out_xattr_get(struct tgt_session_info *tsi)
419 {
420         const struct lu_env        *env = tsi->tsi_env;
421         struct tgt_thread_info     *tti = tgt_th_info(env);
422         struct object_update       *update = tti->tti_u.update.tti_update;
423         struct lu_buf              *lbuf = &tti->tti_buf;
424         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
425         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
426         char                       *name;
427         struct object_update_result *update_result;
428         int                     idx = tti->tti_u.update.tti_update_reply_index;
429         int                        rc;
430
431         ENTRY;
432
433         name = object_update_param_get(update, 0, NULL);
434         if (name == NULL) {
435                 CERROR("%s: empty name for xattr get: rc = %d\n",
436                        tgt_name(tsi->tsi_tgt), -EPROTO);
437                 RETURN(err_serious(-EPROTO));
438         }
439
440         update_result = object_update_result_get(reply, 0, NULL);
441         if (update_result == NULL) {
442                 CERROR("%s: empty name for xattr get: rc = %d\n",
443                        tgt_name(tsi->tsi_tgt), -EPROTO);
444                 RETURN(err_serious(-EPROTO));
445         }
446
447         lbuf->lb_buf = update_result->our_data;
448         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
449                        cfs_size_round((unsigned long)update_result->our_data -
450                                       (unsigned long)update_result);
451         dt_read_lock(env, obj, MOR_TGT_CHILD);
452         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
453         dt_read_unlock(env, obj);
454         if (rc < 0) {
455                 lbuf->lb_len = 0;
456                 GOTO(out, rc);
457         }
458         if (rc == 0) {
459                 lbuf->lb_len = 0;
460                 GOTO(out, rc = -ENOENT);
461         }
462         lbuf->lb_len = rc;
463         rc = 0;
464         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
465                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
466                name, (int)lbuf->lb_len);
467
468         GOTO(out, rc);
469
470 out:
471         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
472         RETURN(rc);
473 }
474
475 static int out_index_lookup(struct tgt_session_info *tsi)
476 {
477         const struct lu_env     *env = tsi->tsi_env;
478         struct tgt_thread_info  *tti = tgt_th_info(env);
479         struct object_update    *update = tti->tti_u.update.tti_update;
480         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
481         char                    *name;
482         int                      rc;
483
484         ENTRY;
485
486         if (!lu_object_exists(&obj->do_lu))
487                 RETURN(-ENOENT);
488
489         name = object_update_param_get(update, 0, NULL);
490         if (name == NULL) {
491                 CERROR("%s: empty name for lookup: rc = %d\n",
492                        tgt_name(tsi->tsi_tgt), -EPROTO);
493                 RETURN(err_serious(-EPROTO));
494         }
495
496         dt_read_lock(env, obj, MOR_TGT_CHILD);
497         if (!dt_try_as_dir(env, obj))
498                 GOTO(out_unlock, rc = -ENOTDIR);
499
500         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
501                 (struct dt_key *)name, NULL);
502
503         if (rc < 0)
504                 GOTO(out_unlock, rc);
505
506         if (rc == 0)
507                 rc += 1;
508
509         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
510                PFID(lu_object_fid(&obj->do_lu)), name,
511                PFID(&tti->tti_fid1), rc);
512
513 out_unlock:
514         dt_read_unlock(env, obj);
515
516         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
517                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
518                0, rc);
519
520         object_update_result_insert(tti->tti_u.update.tti_update_reply,
521                             &tti->tti_fid1, sizeof(tti->tti_fid1),
522                             tti->tti_u.update.tti_update_reply_index, rc);
523         RETURN(rc);
524 }
525
526 static int out_tx_xattr_set_exec(const struct lu_env *env,
527                                  struct thandle *th,
528                                  struct tx_arg *arg)
529 {
530         struct dt_object *dt_obj = arg->object;
531         int rc;
532
533         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
534                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
535                arg->u.xattr_set.name, arg->u.xattr_set.flags);
536
537         if (!lu_object_exists(&dt_obj->do_lu))
538                 GOTO(out, rc = -ENOENT);
539
540         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
541         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
542                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
543                           th, NULL);
544         dt_write_unlock(env, dt_obj);
545         /**
546          * Ignore errors if this is LINK EA
547          **/
548         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
549                 rc = 0;
550 out:
551         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
552                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
553
554         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
555
556         return rc;
557 }
558
559 static int __out_tx_xattr_set(const struct lu_env *env,
560                               struct dt_object *dt_obj,
561                               const struct lu_buf *buf,
562                               const char *name, int flags,
563                               struct thandle_exec_args *ta,
564                               struct object_update_reply *reply,
565                               int index, char *file, int line)
566 {
567         struct tx_arg           *arg;
568
569         LASSERT(ta->ta_handle != NULL);
570         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
571                                           flags, ta->ta_handle);
572         if (ta->ta_err != 0)
573                 return ta->ta_err;
574
575         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
576         LASSERT(arg);
577         lu_object_get(&dt_obj->do_lu);
578         arg->object = dt_obj;
579         arg->u.xattr_set.name = name;
580         arg->u.xattr_set.flags = flags;
581         arg->u.xattr_set.buf = *buf;
582         arg->reply = reply;
583         arg->index = index;
584         arg->u.xattr_set.csum = 0;
585         return 0;
586 }
587
588 static int out_xattr_set(struct tgt_session_info *tsi)
589 {
590         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
591         struct object_update    *update = tti->tti_u.update.tti_update;
592         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
593         struct lu_buf           *lbuf = &tti->tti_buf;
594         char                    *name;
595         char                    *buf;
596         char                    *tmp;
597         int                      buf_len = 0;
598         int                      flag;
599         int                      rc;
600         ENTRY;
601
602         name = object_update_param_get(update, 0, NULL);
603         if (name == NULL) {
604                 CERROR("%s: empty name for xattr set: rc = %d\n",
605                        tgt_name(tsi->tsi_tgt), -EPROTO);
606                 RETURN(err_serious(-EPROTO));
607         }
608
609         buf = object_update_param_get(update, 1, &buf_len);
610         if (buf == NULL || buf_len == 0) {
611                 CERROR("%s: empty buf for xattr set: rc = %d\n",
612                        tgt_name(tsi->tsi_tgt), -EPROTO);
613                 RETURN(err_serious(-EPROTO));
614         }
615
616         lbuf->lb_buf = buf;
617         lbuf->lb_len = buf_len;
618
619         tmp = (char *)object_update_param_get(update, 2, NULL);
620         if (tmp == NULL) {
621                 CERROR("%s: empty flag for xattr set: rc = %d\n",
622                        tgt_name(tsi->tsi_tgt), -EPROTO);
623                 RETURN(err_serious(-EPROTO));
624         }
625
626         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
627                 __swab32s((__u32 *)tmp);
628         flag = *(int *)tmp;
629
630         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
631                               &tti->tti_tea,
632                               tti->tti_u.update.tti_update_reply,
633                               tti->tti_u.update.tti_update_reply_index);
634         RETURN(rc);
635 }
636
637 static int out_obj_ref_add(const struct lu_env *env,
638                            struct dt_object *dt_obj,
639                            struct thandle *th)
640 {
641         int rc;
642
643         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
644         rc = dt_ref_add(env, dt_obj, th);
645         dt_write_unlock(env, dt_obj);
646
647         return rc;
648 }
649
650 static int out_obj_ref_del(const struct lu_env *env,
651                            struct dt_object *dt_obj,
652                            struct thandle *th)
653 {
654         int rc;
655
656         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
657         rc = dt_ref_del(env, dt_obj, th);
658         dt_write_unlock(env, dt_obj);
659
660         return rc;
661 }
662
663 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
664                                struct tx_arg *arg)
665 {
666         struct dt_object *dt_obj = arg->object;
667         int rc;
668
669         rc = out_obj_ref_add(env, dt_obj, th);
670
671         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
672                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
673
674         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
675         return rc;
676 }
677
678 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
679                                struct tx_arg *arg)
680 {
681         return out_obj_ref_del(env, arg->object, th);
682 }
683
684 static int __out_tx_ref_add(const struct lu_env *env,
685                             struct dt_object *dt_obj,
686                             struct thandle_exec_args *ta,
687                             struct object_update_reply *reply,
688                             int index, char *file, int line)
689 {
690         struct tx_arg   *arg;
691
692         LASSERT(ta->ta_handle != NULL);
693         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
694         if (ta->ta_err != 0)
695                 return ta->ta_err;
696
697         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
698                           line);
699         LASSERT(arg);
700         lu_object_get(&dt_obj->do_lu);
701         arg->object = dt_obj;
702         arg->reply = reply;
703         arg->index = index;
704         return 0;
705 }
706
707 /**
708  * increase ref of the object
709  **/
710 static int out_ref_add(struct tgt_session_info *tsi)
711 {
712         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
713         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
714         int                      rc;
715
716         ENTRY;
717
718         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
719                             tti->tti_u.update.tti_update_reply,
720                             tti->tti_u.update.tti_update_reply_index);
721         RETURN(rc);
722 }
723
724 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
725                                struct tx_arg *arg)
726 {
727         struct dt_object        *dt_obj = arg->object;
728         int                      rc;
729
730         rc = out_obj_ref_del(env, dt_obj, th);
731
732         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
733                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
734
735         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
736
737         return rc;
738 }
739
740 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
741                                struct tx_arg *arg)
742 {
743         return out_obj_ref_add(env, arg->object, th);
744 }
745
746 static int __out_tx_ref_del(const struct lu_env *env,
747                             struct dt_object *dt_obj,
748                             struct thandle_exec_args *ta,
749                             struct object_update_reply *reply,
750                             int index, char *file, int line)
751 {
752         struct tx_arg   *arg;
753
754         LASSERT(ta->ta_handle != NULL);
755         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
756         if (ta->ta_err != 0)
757                 return ta->ta_err;
758
759         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
760                           line);
761         LASSERT(arg);
762         lu_object_get(&dt_obj->do_lu);
763         arg->object = dt_obj;
764         arg->reply = reply;
765         arg->index = index;
766         return 0;
767 }
768
769 static int out_ref_del(struct tgt_session_info *tsi)
770 {
771         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
772         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
773         int                      rc;
774
775         ENTRY;
776
777         if (!lu_object_exists(&obj->do_lu))
778                 RETURN(-ENOENT);
779
780         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
781                             tti->tti_u.update.tti_update_reply,
782                             tti->tti_u.update.tti_update_reply_index);
783         RETURN(rc);
784 }
785
786 static int out_obj_index_insert(const struct lu_env *env,
787                                 struct dt_object *dt_obj,
788                                 const struct dt_rec *rec,
789                                 const struct dt_key *key,
790                                 struct thandle *th)
791 {
792         int rc;
793
794         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
795                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
796                (char *)key, PFID((struct lu_fid *)rec));
797
798         if (dt_try_as_dir(env, dt_obj) == 0)
799                 return -ENOTDIR;
800
801         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
802         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
803         dt_write_unlock(env, dt_obj);
804
805         return rc;
806 }
807
808 static int out_obj_index_delete(const struct lu_env *env,
809                                 struct dt_object *dt_obj,
810                                 const struct dt_key *key,
811                                 struct thandle *th)
812 {
813         int rc;
814
815         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
816                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
817                (char *)key);
818
819         if (dt_try_as_dir(env, dt_obj) == 0)
820                 return -ENOTDIR;
821
822         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
823         rc = dt_delete(env, dt_obj, key, th, NULL);
824         dt_write_unlock(env, dt_obj);
825
826         return rc;
827 }
828
829 static int out_tx_index_insert_exec(const struct lu_env *env,
830                                     struct thandle *th, struct tx_arg *arg)
831 {
832         struct dt_object *dt_obj = arg->object;
833         int rc;
834
835         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
836                                   arg->u.insert.key, th);
837
838         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
839                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
840
841         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
842
843         return rc;
844 }
845
846 static int out_tx_index_insert_undo(const struct lu_env *env,
847                                     struct thandle *th, struct tx_arg *arg)
848 {
849         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
850 }
851
852 static int __out_tx_index_insert(const struct lu_env *env,
853                                  struct dt_object *dt_obj,
854                                  char *name, struct lu_fid *fid,
855                                  struct thandle_exec_args *ta,
856                                  struct object_update_reply *reply,
857                                  int index, char *file, int line)
858 {
859         struct tx_arg *arg;
860
861         LASSERT(ta->ta_handle != NULL);
862
863         if (dt_try_as_dir(env, dt_obj) == 0) {
864                 ta->ta_err = -ENOTDIR;
865                 return ta->ta_err;
866         }
867
868         ta->ta_err = dt_declare_insert(env, dt_obj,
869                                        (struct dt_rec *)fid,
870                                        (struct dt_key *)name,
871                                        ta->ta_handle);
872
873         if (ta->ta_err != 0)
874                 return ta->ta_err;
875
876         arg = tx_add_exec(ta, out_tx_index_insert_exec,
877                           out_tx_index_insert_undo, file,
878                           line);
879         LASSERT(arg);
880         lu_object_get(&dt_obj->do_lu);
881         arg->object = dt_obj;
882         arg->reply = reply;
883         arg->index = index;
884         arg->u.insert.rec = (struct dt_rec *)fid;
885         arg->u.insert.key = (struct dt_key *)name;
886
887         return 0;
888 }
889
890 static int out_index_insert(struct tgt_session_info *tsi)
891 {
892         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
893         struct object_update    *update = tti->tti_u.update.tti_update;
894         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
895         struct lu_fid     *fid;
896         char              *name;
897         int                rc = 0;
898         int                size;
899
900         ENTRY;
901
902         name = object_update_param_get(update, 0, NULL);
903         if (name == NULL) {
904                 CERROR("%s: empty name for index insert: rc = %d\n",
905                        tgt_name(tsi->tsi_tgt), -EPROTO);
906                 RETURN(err_serious(-EPROTO));
907         }
908
909         fid = object_update_param_get(update, 1, &size);
910         if (fid == NULL || size != sizeof(*fid)) {
911                 CERROR("%s: invalid fid: rc = %d\n",
912                        tgt_name(tsi->tsi_tgt), -EPROTO);
913                        RETURN(err_serious(-EPROTO));
914         }
915
916         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
917                 lustre_swab_lu_fid(fid);
918
919         if (!fid_is_sane(fid)) {
920                 CERROR("%s: invalid FID "DFID": rc = %d\n",
921                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
922                 RETURN(err_serious(-EPROTO));
923         }
924
925         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
926                                  &tti->tti_tea,
927                                  tti->tti_u.update.tti_update_reply,
928                                  tti->tti_u.update.tti_update_reply_index);
929         RETURN(rc);
930 }
931
932 static int out_tx_index_delete_exec(const struct lu_env *env,
933                                     struct thandle *th,
934                                     struct tx_arg *arg)
935 {
936         int rc;
937
938         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
939
940         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
941                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
942
943         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
944
945         return rc;
946 }
947
948 static int out_tx_index_delete_undo(const struct lu_env *env,
949                                     struct thandle *th,
950                                     struct tx_arg *arg)
951 {
952         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
953                dt_obd_name(th->th_dev), -ENOTSUPP);
954         return -ENOTSUPP;
955 }
956
957 static int __out_tx_index_delete(const struct lu_env *env,
958                                  struct dt_object *dt_obj, char *name,
959                                  struct thandle_exec_args *ta,
960                                  struct object_update_reply *reply,
961                                  int index, char *file, int line)
962 {
963         struct tx_arg *arg;
964
965         if (dt_try_as_dir(env, dt_obj) == 0) {
966                 ta->ta_err = -ENOTDIR;
967                 return ta->ta_err;
968         }
969
970         LASSERT(ta->ta_handle != NULL);
971         ta->ta_err = dt_declare_delete(env, dt_obj,
972                                        (struct dt_key *)name,
973                                        ta->ta_handle);
974         if (ta->ta_err != 0)
975                 return ta->ta_err;
976
977         arg = tx_add_exec(ta, out_tx_index_delete_exec,
978                           out_tx_index_delete_undo, file,
979                           line);
980         LASSERT(arg);
981         lu_object_get(&dt_obj->do_lu);
982         arg->object = dt_obj;
983         arg->reply = reply;
984         arg->index = index;
985         arg->u.insert.key = (struct dt_key *)name;
986         return 0;
987 }
988
989 static int out_index_delete(struct tgt_session_info *tsi)
990 {
991         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
992         struct object_update    *update = tti->tti_u.update.tti_update;
993         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
994         char                    *name;
995         int                      rc = 0;
996
997         if (!lu_object_exists(&obj->do_lu))
998                 RETURN(-ENOENT);
999
1000         name = object_update_param_get(update, 0, NULL);
1001         if (name == NULL) {
1002                 CERROR("%s: empty name for index delete: rc = %d\n",
1003                        tgt_name(tsi->tsi_tgt), -EPROTO);
1004                 RETURN(err_serious(-EPROTO));
1005         }
1006
1007         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1008                                  tti->tti_u.update.tti_update_reply,
1009                                  tti->tti_u.update.tti_update_reply_index);
1010         RETURN(rc);
1011 }
1012
1013 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1014                                struct tx_arg *arg)
1015 {
1016         struct dt_object *dt_obj = arg->object;
1017         int rc;
1018
1019         rc = out_obj_destroy(env, dt_obj, th);
1020
1021         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1022                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1023
1024         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1025
1026         RETURN(rc);
1027 }
1028
1029 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1030                                struct tx_arg *arg)
1031 {
1032         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1033                dt_obd_name(th->th_dev), -ENOTSUPP);
1034         return -ENOTSUPP;
1035 }
1036
1037 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1038                              struct thandle_exec_args *ta,
1039                              struct object_update_reply *reply,
1040                              int index, char *file, int line)
1041 {
1042         struct tx_arg *arg;
1043
1044         LASSERT(ta->ta_handle != NULL);
1045         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1046         if (ta->ta_err)
1047                 return ta->ta_err;
1048
1049         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1050                           file, line);
1051         LASSERT(arg);
1052         lu_object_get(&dt_obj->do_lu);
1053         arg->object = dt_obj;
1054         arg->reply = reply;
1055         arg->index = index;
1056         return 0;
1057 }
1058
1059 static int out_destroy(struct tgt_session_info *tsi)
1060 {
1061         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1062         struct object_update    *update = tti->tti_u.update.tti_update;
1063         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1064         struct lu_fid           *fid;
1065         int                      rc;
1066         ENTRY;
1067
1068         fid = &update->ou_fid;
1069         if (!fid_is_sane(fid)) {
1070                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1071                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1072                 RETURN(err_serious(-EPROTO));
1073         }
1074
1075         if (!lu_object_exists(&obj->do_lu))
1076                 RETURN(-ENOENT);
1077
1078         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1079                             tti->tti_u.update.tti_update_reply,
1080                             tti->tti_u.update.tti_update_reply_index);
1081
1082         RETURN(rc);
1083 }
1084
1085 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1086                              struct tx_arg *arg)
1087 {
1088         struct dt_object *dt_obj = arg->object;
1089         int rc;
1090
1091         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1092         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1093                              &arg->u.write.pos, th);
1094         dt_write_unlock(env, dt_obj);
1095
1096         if (rc == 0)
1097                 rc = arg->u.write.buf.lb_len;
1098
1099         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1100
1101         return rc > 0 ? 0 : rc;
1102 }
1103
1104 static int __out_tx_write(const struct lu_env *env,
1105                           struct dt_object *dt_obj,
1106                           const struct lu_buf *buf,
1107                           loff_t pos, struct thandle_exec_args *ta,
1108                           struct object_update_reply *reply,
1109                           int index, char *file, int line)
1110 {
1111         struct tx_arg   *arg;
1112
1113         LASSERT(ta->ta_handle != NULL);
1114         ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
1115                                              ta->ta_handle);
1116         if (ta->ta_err != 0)
1117                 return ta->ta_err;
1118
1119         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1120         LASSERT(arg);
1121         lu_object_get(&dt_obj->do_lu);
1122         arg->object = dt_obj;
1123         arg->u.write.buf = *buf;
1124         arg->u.write.pos = pos;
1125         arg->reply = reply;
1126         arg->index = index;
1127         return 0;
1128 }
1129
1130 static int out_write(struct tgt_session_info *tsi)
1131 {
1132         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1133         struct object_update    *update = tti->tti_u.update.tti_update;
1134         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1135         struct lu_buf           *lbuf = &tti->tti_buf;
1136         char                    *buf;
1137         char                    *tmp;
1138         int                     buf_len = 0;
1139         loff_t                  pos;
1140         int                      rc;
1141         ENTRY;
1142
1143         buf = object_update_param_get(update, 0, &buf_len);
1144         if (buf == NULL || buf_len == 0) {
1145                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1146                        tgt_name(tsi->tsi_tgt), -EPROTO);
1147                 RETURN(err_serious(-EPROTO));
1148         }
1149         lbuf->lb_buf = buf;
1150         lbuf->lb_len = buf_len;
1151
1152         tmp = (char *)object_update_param_get(update, 1, NULL);
1153         if (tmp == NULL) {
1154                 CERROR("%s: empty flag for xattr set: rc = %d\n",
1155                        tgt_name(tsi->tsi_tgt), -EPROTO);
1156                 RETURN(err_serious(-EPROTO));
1157         }
1158
1159         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1160                 __swab64s((__u64 *)tmp);
1161         pos = *(loff_t *)tmp;
1162
1163         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1164                           &tti->tti_tea,
1165                           tti->tti_u.update.tti_update_reply,
1166                           tti->tti_u.update.tti_update_reply_index);
1167         RETURN(rc);
1168 }
1169
1170 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1171 [opc - OUT_CREATE] = {                                  \
1172         .th_name    = name,                             \
1173         .th_fail_id = 0,                                \
1174         .th_opc     = opc,                              \
1175         .th_flags   = flags,                            \
1176         .th_act     = fn,                               \
1177         .th_fmt     = NULL,                             \
1178         .th_version = 0,                                \
1179 }
1180
1181 #define out_handler mdt_handler
1182 static struct tgt_handler out_update_ops[] = {
1183         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1184                      out_create),
1185         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1186                      out_destroy),
1187         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1188                      out_ref_add),
1189         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1190                      out_ref_del),
1191         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1192                      out_attr_set),
1193         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1194                      out_attr_get),
1195         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1196                      out_xattr_set),
1197         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1198                      out_xattr_get),
1199         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1200                      out_index_lookup),
1201         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1202                      MUTABOR | HABEO_REFERO, out_index_insert),
1203         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1204                      MUTABOR | HABEO_REFERO, out_index_delete),
1205         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1206 };
1207
1208 struct tgt_handler *out_handler_find(__u32 opc)
1209 {
1210         struct tgt_handler *h;
1211
1212         h = NULL;
1213         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1214                 h = &out_update_ops[opc - OUT_CREATE];
1215                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1216                          h->th_opc, opc);
1217         } else {
1218                 h = NULL; /* unsupported opc */
1219         }
1220         return h;
1221 }
1222
1223 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1224                         struct thandle_exec_args *ta, struct obd_export *exp)
1225 {
1226         memset(ta, 0, sizeof(*ta));
1227         ta->ta_handle = dt_trans_create(env, dt);
1228         if (IS_ERR(ta->ta_handle)) {
1229                 int rc;
1230
1231                 rc = PTR_ERR(ta->ta_handle);
1232                 ta->ta_handle = NULL;
1233                 CERROR("%s: start handle error: rc = %d\n",
1234                        dt_obd_name(dt), rc);
1235                 return rc;
1236         }
1237         ta->ta_dev = dt;
1238         if (exp->exp_need_sync)
1239                 ta->ta_handle->th_sync = 1;
1240
1241         return 0;
1242 }
1243
1244 static int out_trans_start(const struct lu_env *env,
1245                            struct thandle_exec_args *ta)
1246 {
1247         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
1248 }
1249
1250 static int out_trans_stop(const struct lu_env *env,
1251                           struct thandle_exec_args *ta, int err)
1252 {
1253         int i;
1254         int rc;
1255
1256         ta->ta_handle->th_result = err;
1257         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
1258         for (i = 0; i < ta->ta_argno; i++) {
1259                 if (ta->ta_args[i].object != NULL) {
1260                         struct dt_object *obj = ta->ta_args[i].object;
1261
1262                         /* If the object is being created during this
1263                          * transaction, we need to remove them from the
1264                          * cache immediately, because a few layers are
1265                          * missing in OUT handler, i.e. the object might
1266                          * not be initialized in all layers */
1267                         if (ta->ta_args[i].exec_fn == out_tx_create_exec)
1268                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1269                                         &obj->do_lu.lo_header->loh_flags);
1270                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
1271                         ta->ta_args[i].object = NULL;
1272                 }
1273         }
1274
1275         return rc;
1276 }
1277
1278 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
1279 {
1280         struct tgt_session_info *tsi = tgt_ses_info(env);
1281         int i = 0, rc;
1282
1283         LASSERT(ta->ta_dev);
1284         LASSERT(ta->ta_handle);
1285
1286         if (ta->ta_err != 0 || ta->ta_argno == 0)
1287                 GOTO(stop, rc = ta->ta_err);
1288
1289         rc = out_trans_start(env, ta);
1290         if (unlikely(rc))
1291                 GOTO(stop, rc);
1292
1293         for (i = 0; i < ta->ta_argno; i++) {
1294                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
1295                                             &ta->ta_args[i]);
1296                 if (unlikely(rc != 0)) {
1297                         CDEBUG(D_INFO, "error during execution of #%u from"
1298                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
1299                                ta->ta_args[i].line, rc);
1300                         while (--i >= 0) {
1301                                 if (ta->ta_args[i].undo_fn != NULL)
1302                                         ta->ta_args[i].undo_fn(env,
1303                                                                ta->ta_handle,
1304                                                               &ta->ta_args[i]);
1305                                 else
1306                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1307                                                dt_obd_name(ta->ta_dev),
1308                                                ta->ta_args[i].file,
1309                                                ta->ta_args[i].line, -ENOTSUPP);
1310                         }
1311                         break;
1312                 }
1313         }
1314
1315         /* Only fail for real update */
1316         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1317 stop:
1318         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1319                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
1320         out_trans_stop(env, ta, rc);
1321         ta->ta_handle = NULL;
1322         ta->ta_argno = 0;
1323         ta->ta_err = 0;
1324
1325         RETURN(rc);
1326 }
1327
1328 /**
1329  * Object updates between Targets. Because all the updates has been
1330  * dis-assemblied into object updates at sender side, so OUT will
1331  * call OSD API directly to execute these updates.
1332  *
1333  * In DNE phase I all of the updates in the request need to be executed
1334  * in one transaction, and the transaction has to be synchronously.
1335  *
1336  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1337  * format.
1338  */
1339 int out_handle(struct tgt_session_info *tsi)
1340 {
1341         const struct lu_env             *env = tsi->tsi_env;
1342         struct tgt_thread_info          *tti = tgt_th_info(env);
1343         struct thandle_exec_args        *ta = &tti->tti_tea;
1344         struct req_capsule              *pill = tsi->tsi_pill;
1345         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1346         struct object_update_request    *ureq;
1347         struct object_update            *update;
1348         struct object_update_reply      *reply;
1349         int                              bufsize;
1350         int                              count;
1351         int                              old_batchid = -1;
1352         int                              i;
1353         int                              rc = 0;
1354         int                              rc1 = 0;
1355
1356         ENTRY;
1357
1358         req_capsule_set(pill, &RQF_OUT_UPDATE);
1359         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1360         if (ureq == NULL) {
1361                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1362                        -EPROTO);
1363                 RETURN(err_serious(-EPROTO));
1364         }
1365
1366         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1367         if (bufsize != object_update_request_size(ureq)) {
1368                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1369                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1370                 RETURN(err_serious(-EPROTO));
1371         }
1372
1373         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1374                 CERROR("%s: invalid update buffer magic %x expect %x: "
1375                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1376                        UPDATE_REQUEST_MAGIC, -EPROTO);
1377                 RETURN(err_serious(-EPROTO));
1378         }
1379
1380         count = ureq->ourq_count;
1381         if (count <= 0) {
1382                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1383                        -EPROTO);
1384                 RETURN(err_serious(-EPROTO));
1385         }
1386
1387         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1388                              OUT_UPDATE_REPLY_SIZE);
1389         rc = req_capsule_server_pack(pill);
1390         if (rc != 0) {
1391                 CERROR("%s: Can't pack response: rc = %d\n",
1392                        tgt_name(tsi->tsi_tgt), rc);
1393                 RETURN(rc);
1394         }
1395
1396         /* Prepare the update reply buffer */
1397         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1398         if (reply == NULL)
1399                 RETURN(err_serious(-EPROTO));
1400         object_update_reply_init(reply, count);
1401         tti->tti_u.update.tti_update_reply = reply;
1402
1403         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1404         if (rc != 0)
1405                 RETURN(rc);
1406
1407         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1408
1409         /* Walk through updates in the request to execute them synchronously */
1410         for (i = 0; i < count; i++) {
1411                 struct tgt_handler      *h;
1412                 struct dt_object        *dt_obj;
1413
1414                 update = object_update_request_get(ureq, i, NULL);
1415                 if (update == NULL)
1416                         GOTO(out, rc = -EPROTO);
1417
1418                 if (ptlrpc_req_need_swab(pill->rc_req))
1419                         lustre_swab_object_update(update);
1420
1421                 if (old_batchid == -1) {
1422                         old_batchid = update->ou_batchid;
1423                 } else if (old_batchid != update->ou_batchid) {
1424                         /* Stop the current update transaction,
1425                          * create a new one */
1426                         rc = out_tx_end(env, ta);
1427                         if (rc < 0)
1428                                 RETURN(rc);
1429
1430                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1431                         if (rc != 0)
1432                                 RETURN(rc);
1433                         old_batchid = update->ou_batchid;
1434                 }
1435
1436                 if (!fid_is_sane(&update->ou_fid)) {
1437                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1438                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1439                                -EPROTO);
1440                         GOTO(out, rc = err_serious(-EPROTO));
1441                 }
1442
1443                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1444                 if (IS_ERR(dt_obj))
1445                         GOTO(out, rc = PTR_ERR(dt_obj));
1446
1447                 if (dt->dd_record_fid_accessed) {
1448                         lfsck_pack_rfa(&tti->tti_lr,
1449                                        lu_object_fid(&dt_obj->do_lu));
1450                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1451                 }
1452
1453                 tti->tti_u.update.tti_dt_object = dt_obj;
1454                 tti->tti_u.update.tti_update = update;
1455                 tti->tti_u.update.tti_update_reply_index = i;
1456
1457                 h = out_handler_find(update->ou_type);
1458                 if (likely(h != NULL)) {
1459                         /* For real modification RPC, check if the update
1460                          * has been executed */
1461                         if (h->th_flags & MUTABOR) {
1462                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1463
1464                                 if (out_check_resent(env, dt, dt_obj, req,
1465                                                      out_reconstruct, reply, i))
1466                                         GOTO(next, rc);
1467                         }
1468
1469                         rc = h->th_act(tsi);
1470                 } else {
1471                         CERROR("%s: The unsupported opc: 0x%x\n",
1472                                tgt_name(tsi->tsi_tgt), update->ou_type);
1473                         lu_object_put(env, &dt_obj->do_lu);
1474                         GOTO(out, rc = -ENOTSUPP);
1475                 }
1476 next:
1477                 lu_object_put(env, &dt_obj->do_lu);
1478                 if (rc < 0)
1479                         GOTO(out, rc);
1480         }
1481 out:
1482         rc1 = out_tx_end(env, ta);
1483         if (rc == 0)
1484                 rc = rc1;
1485         RETURN(rc);
1486 }
1487
1488 struct tgt_handler tgt_out_handlers[] = {
1489 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1490 };
1491 EXPORT_SYMBOL(tgt_out_handlers);
1492