Whamcloud - gitweb
LU-3539 osp: Fix a series of UPDATE_OBJ endianness bugs
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48                  "Too many updates(%d) in one trans\n", ta->ta_argno);
49         i = ta->ta_argno;
50
51         ta->ta_argno++;
52
53         ta->ta_args[i].exec_fn = func;
54         ta->ta_args[i].undo_fn = undo;
55         ta->ta_args[i].file    = file;
56         ta->ta_args[i].line    = line;
57
58         return &ta->ta_args[i];
59 }
60
61 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
62                             struct dt_object *obj,
63                             struct object_update_reply *reply,
64                             int index)
65 {
66         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
67                dt_obd_name(dt), reply, index, 0);
68
69         object_update_result_insert(reply, NULL, 0, index, 0);
70         return;
71 }
72
73 typedef void (*out_reconstruct_t)(const struct lu_env *env,
74                                   struct dt_device *dt,
75                                   struct dt_object *obj,
76                                   struct object_update_reply *reply,
77                                   int index);
78
79 static inline int out_check_resent(const struct lu_env *env,
80                                    struct dt_device *dt,
81                                    struct dt_object *obj,
82                                    struct ptlrpc_request *req,
83                                    out_reconstruct_t reconstruct,
84                                    struct object_update_reply *reply,
85                                    int index)
86 {
87         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
88                 return 0;
89
90         if (req_xid_is_last(req)) {
91                 reconstruct(env, dt, obj, reply, index);
92                 return 1;
93         }
94         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
95                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
96         return 0;
97 }
98
99 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
100                            struct thandle *th)
101 {
102         int rc;
103
104         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
105                PFID(lu_object_fid(&dt_obj->do_lu)));
106
107         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
108         rc = dt_destroy(env, dt_obj, th);
109         dt_write_unlock(env, dt_obj);
110
111         return rc;
112 }
113
114 /**
115  * All of the xxx_undo will be used once execution failed,
116  * But because all of the required resource has been reserved in
117  * declare phase, i.e. if declare succeed, it should make sure
118  * the following executing phase succeed in anyway, so these undo
119  * should be useless for most of the time in Phase I
120  */
121 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
122                        struct tx_arg *arg)
123 {
124         int rc;
125
126         rc = out_obj_destroy(env, arg->object, th);
127         if (rc != 0)
128                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
129                        dt_obd_name(th->th_dev), rc);
130         return rc;
131 }
132
133 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
134                        struct tx_arg *arg)
135 {
136         struct dt_object        *dt_obj = arg->object;
137         int                      rc;
138
139         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
140                dt_obd_name(th->th_dev),
141                PFID(lu_object_fid(&arg->object->do_lu)),
142                arg->u.create.dof.dof_type,
143                arg->u.create.attr.la_mode & S_IFMT);
144
145         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
146         rc = dt_create(env, dt_obj, &arg->u.create.attr,
147                        &arg->u.create.hint, &arg->u.create.dof, th);
148
149         dt_write_unlock(env, dt_obj);
150
151         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
152                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
153
154         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
155
156         return rc;
157 }
158
159 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
160                            struct lu_attr *attr, struct lu_fid *parent_fid,
161                            struct dt_object_format *dof,
162                            struct thandle_exec_args *ta,
163                            struct object_update_reply *reply,
164                            int index, char *file, int line)
165 {
166         struct tx_arg *arg;
167
168         LASSERT(ta->ta_handle != NULL);
169         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
170                                        ta->ta_handle);
171         if (ta->ta_err != 0)
172                 return ta->ta_err;
173
174         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
175                           line);
176         LASSERT(arg);
177
178         /* release the object in out_trans_stop */
179         lu_object_get(&obj->do_lu);
180         arg->object = obj;
181         arg->u.create.attr = *attr;
182         if (parent_fid != NULL)
183                 arg->u.create.fid = *parent_fid;
184         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
185         arg->u.create.dof  = *dof;
186         arg->reply = reply;
187         arg->index = index;
188
189         return 0;
190 }
191
192 static int out_create(struct tgt_session_info *tsi)
193 {
194         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
195         struct object_update    *update = tti->tti_u.update.tti_update;
196         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
197         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
198         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
199         struct lu_attr          *attr = &tti->tti_attr;
200         struct lu_fid           *fid = NULL;
201         struct obdo             *wobdo;
202         int                     size;
203         int                     rc;
204
205         ENTRY;
206
207         wobdo = object_update_param_get(update, 0, &size);
208         if (wobdo == NULL || size != sizeof(*wobdo)) {
209                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
210                        tgt_name(tsi->tsi_tgt), -EPROTO);
211                 RETURN(err_serious(-EPROTO));
212         }
213
214         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
215                 lustre_swab_obdo(wobdo);
216         lustre_get_wire_obdo(NULL, lobdo, wobdo);
217         la_from_obdo(attr, lobdo, lobdo->o_valid);
218
219         dof->dof_type = dt_mode_to_dft(attr->la_mode);
220         if (update->ou_params_count > 1) {
221                 int size;
222
223                 fid = object_update_param_get(update, 1, &size);
224                 if (fid == NULL || size != sizeof(*fid)) {
225                         CERROR("%s: invalid fid: rc = %d\n",
226                                tgt_name(tsi->tsi_tgt), -EPROTO);
227                         RETURN(err_serious(-EPROTO));
228                 }
229                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
230                         lustre_swab_lu_fid(fid);
231                 if (!fid_is_sane(fid)) {
232                         CERROR("%s: invalid fid "DFID": rc = %d\n",
233                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
234                         RETURN(err_serious(-EPROTO));
235                 }
236         }
237
238         if (lu_object_exists(&obj->do_lu))
239                 RETURN(-EEXIST);
240
241         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
242                            &tti->tti_tea,
243                            tti->tti_u.update.tti_update_reply,
244                            tti->tti_u.update.tti_update_reply_index);
245
246         RETURN(rc);
247 }
248
249 static int out_tx_attr_set_undo(const struct lu_env *env,
250                                 struct thandle *th, struct tx_arg *arg)
251 {
252         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
253                dt_obd_name(th->th_dev),
254                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
255
256         return -ENOTSUPP;
257 }
258
259 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
260                                 struct tx_arg *arg)
261 {
262         struct dt_object        *dt_obj = arg->object;
263         int                     rc;
264
265         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
266                PFID(lu_object_fid(&dt_obj->do_lu)));
267
268         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
269         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
270         dt_write_unlock(env, dt_obj);
271
272         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
273                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
274
275         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
276
277         return rc;
278 }
279
280 static int __out_tx_attr_set(const struct lu_env *env,
281                              struct dt_object *dt_obj,
282                              const struct lu_attr *attr,
283                              struct thandle_exec_args *th,
284                              struct object_update_reply *reply,
285                              int index, char *file, int line)
286 {
287         struct tx_arg           *arg;
288
289         LASSERT(th->ta_handle != NULL);
290         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
291         if (th->ta_err != 0)
292                 return th->ta_err;
293
294         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
295                           file, line);
296         LASSERT(arg);
297         lu_object_get(&dt_obj->do_lu);
298         arg->object = dt_obj;
299         arg->u.attr_set.attr = *attr;
300         arg->reply = reply;
301         arg->index = index;
302         return 0;
303 }
304
305 static int out_attr_set(struct tgt_session_info *tsi)
306 {
307         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
308         struct object_update    *update = tti->tti_u.update.tti_update;
309         struct lu_attr          *attr = &tti->tti_attr;
310         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
311         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
312         struct obdo             *wobdo;
313         int                      size;
314         int                      rc;
315
316         ENTRY;
317
318         wobdo = object_update_param_get(update, 0, &size);
319         if (wobdo == NULL || size != sizeof(*wobdo)) {
320                 CERROR("%s: empty obdo in the update: rc = %d\n",
321                        tgt_name(tsi->tsi_tgt), -EPROTO);
322                 RETURN(err_serious(-EPROTO));
323         }
324
325         attr->la_valid = 0;
326         attr->la_valid = 0;
327
328         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
329                 lustre_swab_obdo(wobdo);
330         lustre_get_wire_obdo(NULL, lobdo, wobdo);
331         la_from_obdo(attr, lobdo, lobdo->o_valid);
332
333         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
334                              tti->tti_u.update.tti_update_reply,
335                              tti->tti_u.update.tti_update_reply_index);
336
337         RETURN(rc);
338 }
339
340 static int out_attr_get(struct tgt_session_info *tsi)
341 {
342         const struct lu_env     *env = tsi->tsi_env;
343         struct tgt_thread_info  *tti = tgt_th_info(env);
344         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
345         struct lu_attr          *la = &tti->tti_attr;
346         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
347         int                     idx = tti->tti_u.update.tti_update_reply_index;
348         int                     rc;
349
350         ENTRY;
351
352         if (!lu_object_exists(&obj->do_lu)) {
353                 /* Usually, this will be called when the master MDT try
354                  * to init a remote object(see osp_object_init), so if
355                  * the object does not exist on slave, we need set BANSHEE flag,
356                  * so the object can be removed from the cache immediately */
357                 set_bit(LU_OBJECT_HEARD_BANSHEE,
358                         &obj->do_lu.lo_header->loh_flags);
359                 RETURN(-ENOENT);
360         }
361
362         dt_read_lock(env, obj, MOR_TGT_CHILD);
363         rc = dt_attr_get(env, obj, la, NULL);
364         if (rc)
365                 GOTO(out_unlock, rc);
366         /*
367          * If it is a directory, we will also check whether the
368          * directory is empty.
369          * la_flags = 0 : Empty.
370          *          = 1 : Not empty.
371          */
372         la->la_flags = 0;
373         if (S_ISDIR(la->la_mode)) {
374                 struct dt_it            *it;
375                 const struct dt_it_ops  *iops;
376
377                 if (!dt_try_as_dir(env, obj))
378                         GOTO(out_unlock, rc = -ENOTDIR);
379
380                 iops = &obj->do_index_ops->dio_it;
381                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
382                 if (!IS_ERR(it)) {
383                         int  result;
384                         result = iops->get(env, it, (const void *)"");
385                         if (result > 0) {
386                                 int i;
387                                 for (result = 0, i = 0; result == 0 && i < 3;
388                                      ++i)
389                                         result = iops->next(env, it);
390                                 if (result == 0)
391                                         la->la_flags = 1;
392                         } else if (result == 0)
393                                 /*
394                                  * Huh? Index contains no zero key?
395                                  */
396                                 rc = -EIO;
397
398                         iops->put(env, it);
399                         iops->fini(env, it);
400                 }
401         }
402
403         obdo->o_valid = 0;
404         obdo_from_la(obdo, la, la->la_valid);
405         lustre_set_wire_obdo(NULL, obdo, obdo);
406
407 out_unlock:
408         dt_read_unlock(env, obj);
409
410         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
411                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
412                0, rc);
413
414         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
415                                     sizeof(*obdo), idx, rc);
416
417         RETURN(rc);
418 }
419
420 static int out_xattr_get(struct tgt_session_info *tsi)
421 {
422         const struct lu_env        *env = tsi->tsi_env;
423         struct tgt_thread_info     *tti = tgt_th_info(env);
424         struct object_update       *update = tti->tti_u.update.tti_update;
425         struct lu_buf              *lbuf = &tti->tti_buf;
426         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
427         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
428         char                       *name;
429         struct object_update_result *update_result;
430         int                     idx = tti->tti_u.update.tti_update_reply_index;
431         int                        rc;
432
433         ENTRY;
434
435         if (!lu_object_exists(&obj->do_lu)) {
436                 set_bit(LU_OBJECT_HEARD_BANSHEE,
437                         &obj->do_lu.lo_header->loh_flags);
438                 RETURN(-ENOENT);
439         }
440
441         name = object_update_param_get(update, 0, NULL);
442         if (name == NULL) {
443                 CERROR("%s: empty name for xattr get: rc = %d\n",
444                        tgt_name(tsi->tsi_tgt), -EPROTO);
445                 RETURN(err_serious(-EPROTO));
446         }
447
448         update_result = object_update_result_get(reply, 0, NULL);
449         if (update_result == NULL) {
450                 CERROR("%s: empty name for xattr get: rc = %d\n",
451                        tgt_name(tsi->tsi_tgt), -EPROTO);
452                 RETURN(err_serious(-EPROTO));
453         }
454
455         lbuf->lb_buf = update_result->our_data;
456         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
457                        cfs_size_round((unsigned long)update_result->our_data -
458                                       (unsigned long)update_result);
459         dt_read_lock(env, obj, MOR_TGT_CHILD);
460         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
461         dt_read_unlock(env, obj);
462         if (rc < 0) {
463                 lbuf->lb_len = 0;
464                 GOTO(out, rc);
465         }
466         if (rc == 0) {
467                 lbuf->lb_len = 0;
468                 GOTO(out, rc = -ENOENT);
469         }
470         lbuf->lb_len = rc;
471         rc = 0;
472         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
473                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
474                name, (int)lbuf->lb_len);
475
476         GOTO(out, rc);
477
478 out:
479         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
480         RETURN(rc);
481 }
482
483 static int out_index_lookup(struct tgt_session_info *tsi)
484 {
485         const struct lu_env     *env = tsi->tsi_env;
486         struct tgt_thread_info  *tti = tgt_th_info(env);
487         struct object_update    *update = tti->tti_u.update.tti_update;
488         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
489         char                    *name;
490         int                      rc;
491
492         ENTRY;
493
494         if (!lu_object_exists(&obj->do_lu))
495                 RETURN(-ENOENT);
496
497         name = object_update_param_get(update, 0, NULL);
498         if (name == NULL) {
499                 CERROR("%s: empty name for lookup: rc = %d\n",
500                        tgt_name(tsi->tsi_tgt), -EPROTO);
501                 RETURN(err_serious(-EPROTO));
502         }
503
504         dt_read_lock(env, obj, MOR_TGT_CHILD);
505         if (!dt_try_as_dir(env, obj))
506                 GOTO(out_unlock, rc = -ENOTDIR);
507
508         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
509                 (struct dt_key *)name, NULL);
510
511         if (rc < 0)
512                 GOTO(out_unlock, rc);
513
514         if (rc == 0)
515                 rc += 1;
516
517         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
518                PFID(lu_object_fid(&obj->do_lu)), name,
519                PFID(&tti->tti_fid1), rc);
520
521 out_unlock:
522         dt_read_unlock(env, obj);
523
524         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
525                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
526                0, rc);
527
528         object_update_result_insert(tti->tti_u.update.tti_update_reply,
529                             &tti->tti_fid1, sizeof(tti->tti_fid1),
530                             tti->tti_u.update.tti_update_reply_index, rc);
531         RETURN(rc);
532 }
533
534 static int out_tx_xattr_set_exec(const struct lu_env *env,
535                                  struct thandle *th,
536                                  struct tx_arg *arg)
537 {
538         struct dt_object *dt_obj = arg->object;
539         int rc;
540
541         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
542                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
543                arg->u.xattr_set.name, arg->u.xattr_set.flags);
544
545         if (!lu_object_exists(&dt_obj->do_lu))
546                 GOTO(out, rc = -ENOENT);
547
548         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
549         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
550                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
551                           th, NULL);
552         dt_write_unlock(env, dt_obj);
553         /**
554          * Ignore errors if this is LINK EA
555          **/
556         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
557                 rc = 0;
558 out:
559         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
560                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
561
562         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
563
564         return rc;
565 }
566
567 static int __out_tx_xattr_set(const struct lu_env *env,
568                               struct dt_object *dt_obj,
569                               const struct lu_buf *buf,
570                               const char *name, int flags,
571                               struct thandle_exec_args *ta,
572                               struct object_update_reply *reply,
573                               int index, char *file, int line)
574 {
575         struct tx_arg           *arg;
576
577         LASSERT(ta->ta_handle != NULL);
578         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
579                                           flags, ta->ta_handle);
580         if (ta->ta_err != 0)
581                 return ta->ta_err;
582
583         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
584         LASSERT(arg);
585         lu_object_get(&dt_obj->do_lu);
586         arg->object = dt_obj;
587         arg->u.xattr_set.name = name;
588         arg->u.xattr_set.flags = flags;
589         arg->u.xattr_set.buf = *buf;
590         arg->reply = reply;
591         arg->index = index;
592         arg->u.xattr_set.csum = 0;
593         return 0;
594 }
595
596 static int out_xattr_set(struct tgt_session_info *tsi)
597 {
598         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
599         struct object_update    *update = tti->tti_u.update.tti_update;
600         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
601         struct lu_buf           *lbuf = &tti->tti_buf;
602         char                    *name;
603         char                    *buf;
604         char                    *tmp;
605         int                      buf_len = 0;
606         int                      flag;
607         int                      rc;
608         ENTRY;
609
610         name = object_update_param_get(update, 0, NULL);
611         if (name == NULL) {
612                 CERROR("%s: empty name for xattr set: rc = %d\n",
613                        tgt_name(tsi->tsi_tgt), -EPROTO);
614                 RETURN(err_serious(-EPROTO));
615         }
616
617         buf = object_update_param_get(update, 1, &buf_len);
618         if (buf == NULL || buf_len == 0) {
619                 CERROR("%s: empty buf for xattr set: rc = %d\n",
620                        tgt_name(tsi->tsi_tgt), -EPROTO);
621                 RETURN(err_serious(-EPROTO));
622         }
623
624         lbuf->lb_buf = buf;
625         lbuf->lb_len = buf_len;
626
627         tmp = (char *)object_update_param_get(update, 2, NULL);
628         if (tmp == NULL) {
629                 CERROR("%s: empty flag for xattr set: rc = %d\n",
630                        tgt_name(tsi->tsi_tgt), -EPROTO);
631                 RETURN(err_serious(-EPROTO));
632         }
633
634         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
635                 __swab32s((__u32 *)tmp);
636         flag = *(int *)tmp;
637
638         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
639                               &tti->tti_tea,
640                               tti->tti_u.update.tti_update_reply,
641                               tti->tti_u.update.tti_update_reply_index);
642         RETURN(rc);
643 }
644
645 static int out_obj_ref_add(const struct lu_env *env,
646                            struct dt_object *dt_obj,
647                            struct thandle *th)
648 {
649         int rc;
650
651         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
652         rc = dt_ref_add(env, dt_obj, th);
653         dt_write_unlock(env, dt_obj);
654
655         return rc;
656 }
657
658 static int out_obj_ref_del(const struct lu_env *env,
659                            struct dt_object *dt_obj,
660                            struct thandle *th)
661 {
662         int rc;
663
664         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
665         rc = dt_ref_del(env, dt_obj, th);
666         dt_write_unlock(env, dt_obj);
667
668         return rc;
669 }
670
671 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
672                                struct tx_arg *arg)
673 {
674         struct dt_object *dt_obj = arg->object;
675         int rc;
676
677         rc = out_obj_ref_add(env, dt_obj, th);
678
679         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
680                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
681
682         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
683         return rc;
684 }
685
686 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
687                                struct tx_arg *arg)
688 {
689         return out_obj_ref_del(env, arg->object, th);
690 }
691
692 static int __out_tx_ref_add(const struct lu_env *env,
693                             struct dt_object *dt_obj,
694                             struct thandle_exec_args *ta,
695                             struct object_update_reply *reply,
696                             int index, char *file, int line)
697 {
698         struct tx_arg   *arg;
699
700         LASSERT(ta->ta_handle != NULL);
701         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
702         if (ta->ta_err != 0)
703                 return ta->ta_err;
704
705         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
706                           line);
707         LASSERT(arg);
708         lu_object_get(&dt_obj->do_lu);
709         arg->object = dt_obj;
710         arg->reply = reply;
711         arg->index = index;
712         return 0;
713 }
714
715 /**
716  * increase ref of the object
717  **/
718 static int out_ref_add(struct tgt_session_info *tsi)
719 {
720         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
721         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
722         int                      rc;
723
724         ENTRY;
725
726         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
727                             tti->tti_u.update.tti_update_reply,
728                             tti->tti_u.update.tti_update_reply_index);
729         RETURN(rc);
730 }
731
732 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
733                                struct tx_arg *arg)
734 {
735         struct dt_object        *dt_obj = arg->object;
736         int                      rc;
737
738         rc = out_obj_ref_del(env, dt_obj, th);
739
740         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
741                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
742
743         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
744
745         return rc;
746 }
747
748 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
749                                struct tx_arg *arg)
750 {
751         return out_obj_ref_add(env, arg->object, th);
752 }
753
754 static int __out_tx_ref_del(const struct lu_env *env,
755                             struct dt_object *dt_obj,
756                             struct thandle_exec_args *ta,
757                             struct object_update_reply *reply,
758                             int index, char *file, int line)
759 {
760         struct tx_arg   *arg;
761
762         LASSERT(ta->ta_handle != NULL);
763         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
764         if (ta->ta_err != 0)
765                 return ta->ta_err;
766
767         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
768                           line);
769         LASSERT(arg);
770         lu_object_get(&dt_obj->do_lu);
771         arg->object = dt_obj;
772         arg->reply = reply;
773         arg->index = index;
774         return 0;
775 }
776
777 static int out_ref_del(struct tgt_session_info *tsi)
778 {
779         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
780         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
781         int                      rc;
782
783         ENTRY;
784
785         if (!lu_object_exists(&obj->do_lu))
786                 RETURN(-ENOENT);
787
788         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
789                             tti->tti_u.update.tti_update_reply,
790                             tti->tti_u.update.tti_update_reply_index);
791         RETURN(rc);
792 }
793
794 static int out_obj_index_insert(const struct lu_env *env,
795                                 struct dt_object *dt_obj,
796                                 const struct dt_rec *rec,
797                                 const struct dt_key *key,
798                                 struct thandle *th)
799 {
800         int rc;
801
802         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
803                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
804                (char *)key, PFID((struct lu_fid *)rec));
805
806         if (dt_try_as_dir(env, dt_obj) == 0)
807                 return -ENOTDIR;
808
809         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
810         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
811         dt_write_unlock(env, dt_obj);
812
813         return rc;
814 }
815
816 static int out_obj_index_delete(const struct lu_env *env,
817                                 struct dt_object *dt_obj,
818                                 const struct dt_key *key,
819                                 struct thandle *th)
820 {
821         int rc;
822
823         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
824                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
825                (char *)key);
826
827         if (dt_try_as_dir(env, dt_obj) == 0)
828                 return -ENOTDIR;
829
830         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
831         rc = dt_delete(env, dt_obj, key, th, NULL);
832         dt_write_unlock(env, dt_obj);
833
834         return rc;
835 }
836
837 static int out_tx_index_insert_exec(const struct lu_env *env,
838                                     struct thandle *th, struct tx_arg *arg)
839 {
840         struct dt_object *dt_obj = arg->object;
841         int rc;
842
843         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
844                                   arg->u.insert.key, th);
845
846         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
847                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
848
849         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
850
851         return rc;
852 }
853
854 static int out_tx_index_insert_undo(const struct lu_env *env,
855                                     struct thandle *th, struct tx_arg *arg)
856 {
857         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
858 }
859
860 static int __out_tx_index_insert(const struct lu_env *env,
861                                  struct dt_object *dt_obj,
862                                  char *name, struct lu_fid *fid,
863                                  struct thandle_exec_args *ta,
864                                  struct object_update_reply *reply,
865                                  int index, char *file, int line)
866 {
867         struct tx_arg *arg;
868
869         LASSERT(ta->ta_handle != NULL);
870
871         if (dt_try_as_dir(env, dt_obj) == 0) {
872                 ta->ta_err = -ENOTDIR;
873                 return ta->ta_err;
874         }
875
876         ta->ta_err = dt_declare_insert(env, dt_obj,
877                                        (struct dt_rec *)fid,
878                                        (struct dt_key *)name,
879                                        ta->ta_handle);
880
881         if (ta->ta_err != 0)
882                 return ta->ta_err;
883
884         arg = tx_add_exec(ta, out_tx_index_insert_exec,
885                           out_tx_index_insert_undo, file,
886                           line);
887         LASSERT(arg);
888         lu_object_get(&dt_obj->do_lu);
889         arg->object = dt_obj;
890         arg->reply = reply;
891         arg->index = index;
892         arg->u.insert.rec = (struct dt_rec *)fid;
893         arg->u.insert.key = (struct dt_key *)name;
894
895         return 0;
896 }
897
898 static int out_index_insert(struct tgt_session_info *tsi)
899 {
900         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
901         struct object_update    *update = tti->tti_u.update.tti_update;
902         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
903         struct lu_fid     *fid;
904         char              *name;
905         int                rc = 0;
906         int                size;
907
908         ENTRY;
909
910         name = object_update_param_get(update, 0, NULL);
911         if (name == NULL) {
912                 CERROR("%s: empty name for index insert: rc = %d\n",
913                        tgt_name(tsi->tsi_tgt), -EPROTO);
914                 RETURN(err_serious(-EPROTO));
915         }
916
917         fid = object_update_param_get(update, 1, &size);
918         if (fid == NULL || size != sizeof(*fid)) {
919                 CERROR("%s: invalid fid: rc = %d\n",
920                        tgt_name(tsi->tsi_tgt), -EPROTO);
921                        RETURN(err_serious(-EPROTO));
922         }
923
924         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
925                 lustre_swab_lu_fid(fid);
926
927         if (!fid_is_sane(fid)) {
928                 CERROR("%s: invalid FID "DFID": rc = %d\n",
929                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
930                 RETURN(err_serious(-EPROTO));
931         }
932
933         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
934                                  &tti->tti_tea,
935                                  tti->tti_u.update.tti_update_reply,
936                                  tti->tti_u.update.tti_update_reply_index);
937         RETURN(rc);
938 }
939
940 static int out_tx_index_delete_exec(const struct lu_env *env,
941                                     struct thandle *th,
942                                     struct tx_arg *arg)
943 {
944         int rc;
945
946         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
947
948         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
949                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
950
951         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
952
953         return rc;
954 }
955
956 static int out_tx_index_delete_undo(const struct lu_env *env,
957                                     struct thandle *th,
958                                     struct tx_arg *arg)
959 {
960         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
961                dt_obd_name(th->th_dev), -ENOTSUPP);
962         return -ENOTSUPP;
963 }
964
965 static int __out_tx_index_delete(const struct lu_env *env,
966                                  struct dt_object *dt_obj, char *name,
967                                  struct thandle_exec_args *ta,
968                                  struct object_update_reply *reply,
969                                  int index, char *file, int line)
970 {
971         struct tx_arg *arg;
972
973         if (dt_try_as_dir(env, dt_obj) == 0) {
974                 ta->ta_err = -ENOTDIR;
975                 return ta->ta_err;
976         }
977
978         LASSERT(ta->ta_handle != NULL);
979         ta->ta_err = dt_declare_delete(env, dt_obj,
980                                        (struct dt_key *)name,
981                                        ta->ta_handle);
982         if (ta->ta_err != 0)
983                 return ta->ta_err;
984
985         arg = tx_add_exec(ta, out_tx_index_delete_exec,
986                           out_tx_index_delete_undo, file,
987                           line);
988         LASSERT(arg);
989         lu_object_get(&dt_obj->do_lu);
990         arg->object = dt_obj;
991         arg->reply = reply;
992         arg->index = index;
993         arg->u.insert.key = (struct dt_key *)name;
994         return 0;
995 }
996
997 static int out_index_delete(struct tgt_session_info *tsi)
998 {
999         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1000         struct object_update    *update = tti->tti_u.update.tti_update;
1001         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1002         char                    *name;
1003         int                      rc = 0;
1004
1005         if (!lu_object_exists(&obj->do_lu))
1006                 RETURN(-ENOENT);
1007
1008         name = object_update_param_get(update, 0, NULL);
1009         if (name == NULL) {
1010                 CERROR("%s: empty name for index delete: rc = %d\n",
1011                        tgt_name(tsi->tsi_tgt), -EPROTO);
1012                 RETURN(err_serious(-EPROTO));
1013         }
1014
1015         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1016                                  tti->tti_u.update.tti_update_reply,
1017                                  tti->tti_u.update.tti_update_reply_index);
1018         RETURN(rc);
1019 }
1020
1021 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1022                                struct tx_arg *arg)
1023 {
1024         struct dt_object *dt_obj = arg->object;
1025         int rc;
1026
1027         rc = out_obj_destroy(env, dt_obj, th);
1028
1029         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1030                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1031
1032         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1033
1034         RETURN(rc);
1035 }
1036
1037 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1038                                struct tx_arg *arg)
1039 {
1040         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1041                dt_obd_name(th->th_dev), -ENOTSUPP);
1042         return -ENOTSUPP;
1043 }
1044
1045 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1046                              struct thandle_exec_args *ta,
1047                              struct object_update_reply *reply,
1048                              int index, char *file, int line)
1049 {
1050         struct tx_arg *arg;
1051
1052         LASSERT(ta->ta_handle != NULL);
1053         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1054         if (ta->ta_err)
1055                 return ta->ta_err;
1056
1057         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1058                           file, line);
1059         LASSERT(arg);
1060         lu_object_get(&dt_obj->do_lu);
1061         arg->object = dt_obj;
1062         arg->reply = reply;
1063         arg->index = index;
1064         return 0;
1065 }
1066
1067 static int out_destroy(struct tgt_session_info *tsi)
1068 {
1069         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1070         struct object_update    *update = tti->tti_u.update.tti_update;
1071         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1072         struct lu_fid           *fid;
1073         int                      rc;
1074         ENTRY;
1075
1076         fid = &update->ou_fid;
1077         if (!fid_is_sane(fid)) {
1078                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1079                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1080                 RETURN(err_serious(-EPROTO));
1081         }
1082
1083         if (!lu_object_exists(&obj->do_lu))
1084                 RETURN(-ENOENT);
1085
1086         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1087                             tti->tti_u.update.tti_update_reply,
1088                             tti->tti_u.update.tti_update_reply_index);
1089
1090         RETURN(rc);
1091 }
1092
1093 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1094                              struct tx_arg *arg)
1095 {
1096         struct dt_object *dt_obj = arg->object;
1097         int rc;
1098
1099         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1100         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1101                              &arg->u.write.pos, th);
1102         dt_write_unlock(env, dt_obj);
1103
1104         if (rc == 0)
1105                 rc = arg->u.write.buf.lb_len;
1106
1107         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1108
1109         return rc > 0 ? 0 : rc;
1110 }
1111
1112 static int __out_tx_write(const struct lu_env *env,
1113                           struct dt_object *dt_obj,
1114                           const struct lu_buf *buf,
1115                           loff_t pos, struct thandle_exec_args *ta,
1116                           struct object_update_reply *reply,
1117                           int index, char *file, int line)
1118 {
1119         struct tx_arg   *arg;
1120
1121         LASSERT(ta->ta_handle != NULL);
1122         ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
1123                                              ta->ta_handle);
1124         if (ta->ta_err != 0)
1125                 return ta->ta_err;
1126
1127         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1128         LASSERT(arg);
1129         lu_object_get(&dt_obj->do_lu);
1130         arg->object = dt_obj;
1131         arg->u.write.buf = *buf;
1132         arg->u.write.pos = pos;
1133         arg->reply = reply;
1134         arg->index = index;
1135         return 0;
1136 }
1137
1138 static int out_write(struct tgt_session_info *tsi)
1139 {
1140         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1141         struct object_update    *update = tti->tti_u.update.tti_update;
1142         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1143         struct lu_buf           *lbuf = &tti->tti_buf;
1144         char                    *buf;
1145         char                    *tmp;
1146         int                     buf_len = 0;
1147         loff_t                  pos;
1148         int                      rc;
1149         ENTRY;
1150
1151         buf = object_update_param_get(update, 0, &buf_len);
1152         if (buf == NULL || buf_len == 0) {
1153                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1154                        tgt_name(tsi->tsi_tgt), -EPROTO);
1155                 RETURN(err_serious(-EPROTO));
1156         }
1157         lbuf->lb_buf = buf;
1158         lbuf->lb_len = buf_len;
1159
1160         tmp = (char *)object_update_param_get(update, 1, NULL);
1161         if (tmp == NULL) {
1162                 CERROR("%s: empty flag for xattr set: rc = %d\n",
1163                        tgt_name(tsi->tsi_tgt), -EPROTO);
1164                 RETURN(err_serious(-EPROTO));
1165         }
1166
1167         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1168                 __swab64s((__u64 *)tmp);
1169         pos = *(loff_t *)tmp;
1170
1171         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1172                           &tti->tti_tea,
1173                           tti->tti_u.update.tti_update_reply,
1174                           tti->tti_u.update.tti_update_reply_index);
1175         RETURN(rc);
1176 }
1177
1178 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1179 [opc - OUT_CREATE] = {                                  \
1180         .th_name    = name,                             \
1181         .th_fail_id = 0,                                \
1182         .th_opc     = opc,                              \
1183         .th_flags   = flags,                            \
1184         .th_act     = fn,                               \
1185         .th_fmt     = NULL,                             \
1186         .th_version = 0,                                \
1187 }
1188
1189 #define out_handler mdt_handler
1190 static struct tgt_handler out_update_ops[] = {
1191         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1192                      out_create),
1193         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1194                      out_destroy),
1195         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1196                      out_ref_add),
1197         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1198                      out_ref_del),
1199         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1200                      out_attr_set),
1201         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1202                      out_attr_get),
1203         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1204                      out_xattr_set),
1205         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1206                      out_xattr_get),
1207         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1208                      out_index_lookup),
1209         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1210                      MUTABOR | HABEO_REFERO, out_index_insert),
1211         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1212                      MUTABOR | HABEO_REFERO, out_index_delete),
1213         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1214 };
1215
1216 struct tgt_handler *out_handler_find(__u32 opc)
1217 {
1218         struct tgt_handler *h;
1219
1220         h = NULL;
1221         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1222                 h = &out_update_ops[opc - OUT_CREATE];
1223                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1224                          h->th_opc, opc);
1225         } else {
1226                 h = NULL; /* unsupported opc */
1227         }
1228         return h;
1229 }
1230
1231 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1232                         struct thandle_exec_args *ta, struct obd_export *exp)
1233 {
1234         memset(ta, 0, sizeof(*ta));
1235         ta->ta_handle = dt_trans_create(env, dt);
1236         if (IS_ERR(ta->ta_handle)) {
1237                 int rc;
1238
1239                 rc = PTR_ERR(ta->ta_handle);
1240                 ta->ta_handle = NULL;
1241                 CERROR("%s: start handle error: rc = %d\n",
1242                        dt_obd_name(dt), rc);
1243                 return rc;
1244         }
1245         ta->ta_dev = dt;
1246         if (exp->exp_need_sync)
1247                 ta->ta_handle->th_sync = 1;
1248
1249         return 0;
1250 }
1251
1252 static int out_trans_start(const struct lu_env *env,
1253                            struct thandle_exec_args *ta)
1254 {
1255         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
1256 }
1257
1258 static int out_trans_stop(const struct lu_env *env,
1259                           struct thandle_exec_args *ta, int err)
1260 {
1261         int i;
1262         int rc;
1263
1264         ta->ta_handle->th_result = err;
1265         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
1266         for (i = 0; i < ta->ta_argno; i++) {
1267                 if (ta->ta_args[i].object != NULL) {
1268                         struct dt_object *obj = ta->ta_args[i].object;
1269
1270                         /* If the object is being created during this
1271                          * transaction, we need to remove them from the
1272                          * cache immediately, because a few layers are
1273                          * missing in OUT handler, i.e. the object might
1274                          * not be initialized in all layers */
1275                         if (ta->ta_args[i].exec_fn == out_tx_create_exec)
1276                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1277                                         &obj->do_lu.lo_header->loh_flags);
1278                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
1279                         ta->ta_args[i].object = NULL;
1280                 }
1281         }
1282
1283         return rc;
1284 }
1285
1286 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
1287 {
1288         struct tgt_session_info *tsi = tgt_ses_info(env);
1289         int i = 0, rc;
1290
1291         LASSERT(ta->ta_dev);
1292         LASSERT(ta->ta_handle);
1293
1294         if (ta->ta_err != 0 || ta->ta_argno == 0)
1295                 GOTO(stop, rc = ta->ta_err);
1296
1297         rc = out_trans_start(env, ta);
1298         if (unlikely(rc))
1299                 GOTO(stop, rc);
1300
1301         for (i = 0; i < ta->ta_argno; i++) {
1302                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
1303                                             &ta->ta_args[i]);
1304                 if (unlikely(rc != 0)) {
1305                         CDEBUG(D_INFO, "error during execution of #%u from"
1306                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
1307                                ta->ta_args[i].line, rc);
1308                         while (--i >= 0) {
1309                                 if (ta->ta_args[i].undo_fn != NULL)
1310                                         ta->ta_args[i].undo_fn(env,
1311                                                                ta->ta_handle,
1312                                                               &ta->ta_args[i]);
1313                                 else
1314                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1315                                                dt_obd_name(ta->ta_dev),
1316                                                ta->ta_args[i].file,
1317                                                ta->ta_args[i].line, -ENOTSUPP);
1318                         }
1319                         break;
1320                 }
1321         }
1322
1323         /* Only fail for real update */
1324         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1325 stop:
1326         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1327                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
1328         out_trans_stop(env, ta, rc);
1329         ta->ta_handle = NULL;
1330         ta->ta_argno = 0;
1331         ta->ta_err = 0;
1332
1333         RETURN(rc);
1334 }
1335
1336 /**
1337  * Object updates between Targets. Because all the updates has been
1338  * dis-assemblied into object updates at sender side, so OUT will
1339  * call OSD API directly to execute these updates.
1340  *
1341  * In DNE phase I all of the updates in the request need to be executed
1342  * in one transaction, and the transaction has to be synchronously.
1343  *
1344  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1345  * format.
1346  */
1347 int out_handle(struct tgt_session_info *tsi)
1348 {
1349         const struct lu_env             *env = tsi->tsi_env;
1350         struct tgt_thread_info          *tti = tgt_th_info(env);
1351         struct thandle_exec_args        *ta = &tti->tti_tea;
1352         struct req_capsule              *pill = tsi->tsi_pill;
1353         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1354         struct object_update_request    *ureq;
1355         struct object_update            *update;
1356         struct object_update_reply      *reply;
1357         int                              bufsize;
1358         int                              count;
1359         int                              old_batchid = -1;
1360         int                              i;
1361         int                              rc = 0;
1362         int                              rc1 = 0;
1363
1364         ENTRY;
1365
1366         req_capsule_set(pill, &RQF_OUT_UPDATE);
1367         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1368         if (ureq == NULL) {
1369                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1370                        -EPROTO);
1371                 RETURN(err_serious(-EPROTO));
1372         }
1373
1374         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1375         if (bufsize != object_update_request_size(ureq)) {
1376                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1377                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1378                 RETURN(err_serious(-EPROTO));
1379         }
1380
1381         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1382                 CERROR("%s: invalid update buffer magic %x expect %x: "
1383                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1384                        UPDATE_REQUEST_MAGIC, -EPROTO);
1385                 RETURN(err_serious(-EPROTO));
1386         }
1387
1388         count = ureq->ourq_count;
1389         if (count <= 0) {
1390                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1391                        -EPROTO);
1392                 RETURN(err_serious(-EPROTO));
1393         }
1394
1395         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1396                              OUT_UPDATE_REPLY_SIZE);
1397         rc = req_capsule_server_pack(pill);
1398         if (rc != 0) {
1399                 CERROR("%s: Can't pack response: rc = %d\n",
1400                        tgt_name(tsi->tsi_tgt), rc);
1401                 RETURN(rc);
1402         }
1403
1404         /* Prepare the update reply buffer */
1405         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1406         if (reply == NULL)
1407                 RETURN(err_serious(-EPROTO));
1408         object_update_reply_init(reply, count);
1409         tti->tti_u.update.tti_update_reply = reply;
1410
1411         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1412         if (rc != 0)
1413                 RETURN(rc);
1414
1415         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1416
1417         /* Walk through updates in the request to execute them synchronously */
1418         for (i = 0; i < count; i++) {
1419                 struct tgt_handler      *h;
1420                 struct dt_object        *dt_obj;
1421
1422                 update = object_update_request_get(ureq, i, NULL);
1423                 if (update == NULL)
1424                         GOTO(out, rc = -EPROTO);
1425
1426                 if (ptlrpc_req_need_swab(pill->rc_req))
1427                         lustre_swab_object_update(update);
1428
1429                 if (old_batchid == -1) {
1430                         old_batchid = update->ou_batchid;
1431                 } else if (old_batchid != update->ou_batchid) {
1432                         /* Stop the current update transaction,
1433                          * create a new one */
1434                         rc = out_tx_end(env, ta);
1435                         if (rc < 0)
1436                                 RETURN(rc);
1437
1438                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1439                         if (rc != 0)
1440                                 RETURN(rc);
1441                         old_batchid = update->ou_batchid;
1442                 }
1443
1444                 if (!fid_is_sane(&update->ou_fid)) {
1445                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1446                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1447                                -EPROTO);
1448                         GOTO(out, rc = err_serious(-EPROTO));
1449                 }
1450
1451                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1452                 if (IS_ERR(dt_obj))
1453                         GOTO(out, rc = PTR_ERR(dt_obj));
1454
1455                 if (dt->dd_record_fid_accessed) {
1456                         lfsck_pack_rfa(&tti->tti_lr,
1457                                        lu_object_fid(&dt_obj->do_lu));
1458                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1459                 }
1460
1461                 tti->tti_u.update.tti_dt_object = dt_obj;
1462                 tti->tti_u.update.tti_update = update;
1463                 tti->tti_u.update.tti_update_reply_index = i;
1464
1465                 h = out_handler_find(update->ou_type);
1466                 if (likely(h != NULL)) {
1467                         /* For real modification RPC, check if the update
1468                          * has been executed */
1469                         if (h->th_flags & MUTABOR) {
1470                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1471
1472                                 if (out_check_resent(env, dt, dt_obj, req,
1473                                                      out_reconstruct, reply, i))
1474                                         GOTO(next, rc);
1475                         }
1476
1477                         rc = h->th_act(tsi);
1478                 } else {
1479                         CERROR("%s: The unsupported opc: 0x%x\n",
1480                                tgt_name(tsi->tsi_tgt), update->ou_type);
1481                         lu_object_put(env, &dt_obj->do_lu);
1482                         GOTO(out, rc = -ENOTSUPP);
1483                 }
1484 next:
1485                 lu_object_put(env, &dt_obj->do_lu);
1486                 if (rc < 0)
1487                         GOTO(out, rc);
1488         }
1489 out:
1490         rc1 = out_tx_end(env, ta);
1491         if (rc == 0)
1492                 rc = rc1;
1493         RETURN(rc);
1494 }
1495
1496 struct tgt_handler tgt_out_handlers[] = {
1497 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1498 };
1499 EXPORT_SYMBOL(tgt_out_handlers);
1500