Whamcloud - gitweb
LU-3534 out: only start transaction for modified update
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48                  "Too many updates(%d) in one trans\n", ta->ta_argno);
49         i = ta->ta_argno;
50
51         ta->ta_argno++;
52
53         ta->ta_args[i].exec_fn = func;
54         ta->ta_args[i].undo_fn = undo;
55         ta->ta_args[i].file    = file;
56         ta->ta_args[i].line    = line;
57
58         return &ta->ta_args[i];
59 }
60
61 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
62                             struct dt_object *obj,
63                             struct object_update_reply *reply,
64                             int index)
65 {
66         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
67                dt_obd_name(dt), reply, index, 0);
68
69         object_update_result_insert(reply, NULL, 0, index, 0);
70         return;
71 }
72
73 typedef void (*out_reconstruct_t)(const struct lu_env *env,
74                                   struct dt_device *dt,
75                                   struct dt_object *obj,
76                                   struct object_update_reply *reply,
77                                   int index);
78
79 static inline int out_check_resent(const struct lu_env *env,
80                                    struct dt_device *dt,
81                                    struct dt_object *obj,
82                                    struct ptlrpc_request *req,
83                                    out_reconstruct_t reconstruct,
84                                    struct object_update_reply *reply,
85                                    int index)
86 {
87         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
88                 return 0;
89
90         if (req_xid_is_last(req)) {
91                 reconstruct(env, dt, obj, reply, index);
92                 return 1;
93         }
94         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
95                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
96         return 0;
97 }
98
99 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
100                            struct thandle *th)
101 {
102         int rc;
103
104         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
105                PFID(lu_object_fid(&dt_obj->do_lu)));
106
107         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
108         rc = dt_destroy(env, dt_obj, th);
109         dt_write_unlock(env, dt_obj);
110
111         return rc;
112 }
113
114 /**
115  * All of the xxx_undo will be used once execution failed,
116  * But because all of the required resource has been reserved in
117  * declare phase, i.e. if declare succeed, it should make sure
118  * the following executing phase succeed in anyway, so these undo
119  * should be useless for most of the time in Phase I
120  */
121 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
122                        struct tx_arg *arg)
123 {
124         int rc;
125
126         rc = out_obj_destroy(env, arg->object, th);
127         if (rc != 0)
128                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
129                        dt_obd_name(th->th_dev), rc);
130         return rc;
131 }
132
133 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
134                        struct tx_arg *arg)
135 {
136         struct dt_object        *dt_obj = arg->object;
137         int                      rc;
138
139         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
140                dt_obd_name(th->th_dev),
141                PFID(lu_object_fid(&arg->object->do_lu)),
142                arg->u.create.dof.dof_type,
143                arg->u.create.attr.la_mode & S_IFMT);
144
145         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
146         rc = dt_create(env, dt_obj, &arg->u.create.attr,
147                        &arg->u.create.hint, &arg->u.create.dof, th);
148
149         dt_write_unlock(env, dt_obj);
150
151         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
152                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
153
154         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
155
156         return rc;
157 }
158
159 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
160                            struct lu_attr *attr, struct lu_fid *parent_fid,
161                            struct dt_object_format *dof,
162                            struct thandle_exec_args *ta,
163                            struct object_update_reply *reply,
164                            int index, char *file, int line)
165 {
166         struct tx_arg *arg;
167
168         LASSERT(ta->ta_handle != NULL);
169         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
170                                        ta->ta_handle);
171         if (ta->ta_err != 0)
172                 return ta->ta_err;
173
174         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
175                           line);
176         LASSERT(arg);
177
178         /* release the object in out_trans_stop */
179         lu_object_get(&obj->do_lu);
180         arg->object = obj;
181         arg->u.create.attr = *attr;
182         if (parent_fid != NULL)
183                 arg->u.create.fid = *parent_fid;
184         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
185         arg->u.create.dof  = *dof;
186         arg->reply = reply;
187         arg->index = index;
188
189         return 0;
190 }
191
192 static int out_create(struct tgt_session_info *tsi)
193 {
194         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
195         struct object_update    *update = tti->tti_u.update.tti_update;
196         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
197         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
198         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
199         struct lu_attr          *attr = &tti->tti_attr;
200         struct lu_fid           *fid = NULL;
201         struct obdo             *wobdo;
202         int                     size;
203         int                     rc;
204
205         ENTRY;
206
207         wobdo = object_update_param_get(update, 0, &size);
208         if (wobdo == NULL || size != sizeof(*wobdo)) {
209                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
210                        tgt_name(tsi->tsi_tgt), -EPROTO);
211                 RETURN(err_serious(-EPROTO));
212         }
213
214         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
215                 lustre_swab_obdo(wobdo);
216         lustre_get_wire_obdo(NULL, lobdo, wobdo);
217         la_from_obdo(attr, lobdo, lobdo->o_valid);
218
219         dof->dof_type = dt_mode_to_dft(attr->la_mode);
220         if (update->ou_params_count > 1) {
221                 int size;
222
223                 fid = object_update_param_get(update, 1, &size);
224                 if (fid == NULL || size != sizeof(*fid)) {
225                         CERROR("%s: invalid fid: rc = %d\n",
226                                tgt_name(tsi->tsi_tgt), -EPROTO);
227                         RETURN(err_serious(-EPROTO));
228                 }
229                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
230                         lustre_swab_lu_fid(fid);
231                 if (!fid_is_sane(fid)) {
232                         CERROR("%s: invalid fid "DFID": rc = %d\n",
233                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
234                         RETURN(err_serious(-EPROTO));
235                 }
236         }
237
238         if (lu_object_exists(&obj->do_lu))
239                 RETURN(-EEXIST);
240
241         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
242                            &tti->tti_tea,
243                            tti->tti_u.update.tti_update_reply,
244                            tti->tti_u.update.tti_update_reply_index);
245
246         RETURN(rc);
247 }
248
249 static int out_tx_attr_set_undo(const struct lu_env *env,
250                                 struct thandle *th, struct tx_arg *arg)
251 {
252         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
253                dt_obd_name(th->th_dev),
254                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
255
256         return -ENOTSUPP;
257 }
258
259 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
260                                 struct tx_arg *arg)
261 {
262         struct dt_object        *dt_obj = arg->object;
263         int                     rc;
264
265         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
266                PFID(lu_object_fid(&dt_obj->do_lu)));
267
268         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
269         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
270         dt_write_unlock(env, dt_obj);
271
272         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
273                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
274
275         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
276
277         return rc;
278 }
279
280 static int __out_tx_attr_set(const struct lu_env *env,
281                              struct dt_object *dt_obj,
282                              const struct lu_attr *attr,
283                              struct thandle_exec_args *th,
284                              struct object_update_reply *reply,
285                              int index, char *file, int line)
286 {
287         struct tx_arg           *arg;
288
289         LASSERT(th->ta_handle != NULL);
290         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
291         if (th->ta_err != 0)
292                 return th->ta_err;
293
294         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
295                           file, line);
296         LASSERT(arg);
297         lu_object_get(&dt_obj->do_lu);
298         arg->object = dt_obj;
299         arg->u.attr_set.attr = *attr;
300         arg->reply = reply;
301         arg->index = index;
302         return 0;
303 }
304
305 static int out_attr_set(struct tgt_session_info *tsi)
306 {
307         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
308         struct object_update    *update = tti->tti_u.update.tti_update;
309         struct lu_attr          *attr = &tti->tti_attr;
310         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
311         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
312         struct obdo             *wobdo;
313         int                      size;
314         int                      rc;
315
316         ENTRY;
317
318         wobdo = object_update_param_get(update, 0, &size);
319         if (wobdo == NULL || size != sizeof(*wobdo)) {
320                 CERROR("%s: empty obdo in the update: rc = %d\n",
321                        tgt_name(tsi->tsi_tgt), -EPROTO);
322                 RETURN(err_serious(-EPROTO));
323         }
324
325         attr->la_valid = 0;
326         attr->la_valid = 0;
327
328         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
329                 lustre_swab_obdo(wobdo);
330         lustre_get_wire_obdo(NULL, lobdo, wobdo);
331         la_from_obdo(attr, lobdo, lobdo->o_valid);
332
333         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
334                              tti->tti_u.update.tti_update_reply,
335                              tti->tti_u.update.tti_update_reply_index);
336
337         RETURN(rc);
338 }
339
340 static int out_attr_get(struct tgt_session_info *tsi)
341 {
342         const struct lu_env     *env = tsi->tsi_env;
343         struct tgt_thread_info  *tti = tgt_th_info(env);
344         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
345         struct lu_attr          *la = &tti->tti_attr;
346         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
347         int                     idx = tti->tti_u.update.tti_update_reply_index;
348         int                     rc;
349
350         ENTRY;
351
352         if (!lu_object_exists(&obj->do_lu)) {
353                 /* Usually, this will be called when the master MDT try
354                  * to init a remote object(see osp_object_init), so if
355                  * the object does not exist on slave, we need set BANSHEE flag,
356                  * so the object can be removed from the cache immediately */
357                 set_bit(LU_OBJECT_HEARD_BANSHEE,
358                         &obj->do_lu.lo_header->loh_flags);
359                 RETURN(-ENOENT);
360         }
361
362         dt_read_lock(env, obj, MOR_TGT_CHILD);
363         rc = dt_attr_get(env, obj, la, NULL);
364         if (rc)
365                 GOTO(out_unlock, rc);
366         /*
367          * If it is a directory, we will also check whether the
368          * directory is empty.
369          * la_flags = 0 : Empty.
370          *          = 1 : Not empty.
371          */
372         la->la_flags = 0;
373         if (S_ISDIR(la->la_mode)) {
374                 struct dt_it            *it;
375                 const struct dt_it_ops  *iops;
376
377                 if (!dt_try_as_dir(env, obj))
378                         GOTO(out_unlock, rc = -ENOTDIR);
379
380                 iops = &obj->do_index_ops->dio_it;
381                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
382                 if (!IS_ERR(it)) {
383                         int  result;
384                         result = iops->get(env, it, (const void *)"");
385                         if (result > 0) {
386                                 int i;
387                                 for (result = 0, i = 0; result == 0 && i < 3;
388                                      ++i)
389                                         result = iops->next(env, it);
390                                 if (result == 0)
391                                         la->la_flags = 1;
392                         } else if (result == 0)
393                                 /*
394                                  * Huh? Index contains no zero key?
395                                  */
396                                 rc = -EIO;
397
398                         iops->put(env, it);
399                         iops->fini(env, it);
400                 }
401         }
402
403         obdo->o_valid = 0;
404         obdo_from_la(obdo, la, la->la_valid);
405         lustre_set_wire_obdo(NULL, obdo, obdo);
406
407 out_unlock:
408         dt_read_unlock(env, obj);
409
410         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
411                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
412                0, rc);
413
414         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
415                                     sizeof(*obdo), idx, rc);
416
417         RETURN(rc);
418 }
419
420 static int out_xattr_get(struct tgt_session_info *tsi)
421 {
422         const struct lu_env        *env = tsi->tsi_env;
423         struct tgt_thread_info     *tti = tgt_th_info(env);
424         struct object_update       *update = tti->tti_u.update.tti_update;
425         struct lu_buf              *lbuf = &tti->tti_buf;
426         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
427         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
428         char                       *name;
429         struct object_update_result *update_result;
430         int                     idx = tti->tti_u.update.tti_update_reply_index;
431         int                        rc;
432
433         ENTRY;
434
435         if (!lu_object_exists(&obj->do_lu)) {
436                 set_bit(LU_OBJECT_HEARD_BANSHEE,
437                         &obj->do_lu.lo_header->loh_flags);
438                 RETURN(-ENOENT);
439         }
440
441         name = object_update_param_get(update, 0, NULL);
442         if (name == NULL) {
443                 CERROR("%s: empty name for xattr get: rc = %d\n",
444                        tgt_name(tsi->tsi_tgt), -EPROTO);
445                 RETURN(err_serious(-EPROTO));
446         }
447
448         update_result = object_update_result_get(reply, 0, NULL);
449         if (update_result == NULL) {
450                 CERROR("%s: empty name for xattr get: rc = %d\n",
451                        tgt_name(tsi->tsi_tgt), -EPROTO);
452                 RETURN(err_serious(-EPROTO));
453         }
454
455         lbuf->lb_buf = update_result->our_data;
456         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
457                        cfs_size_round((unsigned long)update_result->our_data -
458                                       (unsigned long)update_result);
459         dt_read_lock(env, obj, MOR_TGT_CHILD);
460         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
461         dt_read_unlock(env, obj);
462         if (rc < 0) {
463                 lbuf->lb_len = 0;
464                 GOTO(out, rc);
465         }
466         if (rc == 0) {
467                 lbuf->lb_len = 0;
468                 GOTO(out, rc = -ENOENT);
469         }
470         lbuf->lb_len = rc;
471         rc = 0;
472         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
473                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
474                name, (int)lbuf->lb_len);
475
476         GOTO(out, rc);
477
478 out:
479         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
480         RETURN(rc);
481 }
482
483 static int out_index_lookup(struct tgt_session_info *tsi)
484 {
485         const struct lu_env     *env = tsi->tsi_env;
486         struct tgt_thread_info  *tti = tgt_th_info(env);
487         struct object_update    *update = tti->tti_u.update.tti_update;
488         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
489         char                    *name;
490         int                      rc;
491
492         ENTRY;
493
494         if (!lu_object_exists(&obj->do_lu))
495                 RETURN(-ENOENT);
496
497         name = object_update_param_get(update, 0, NULL);
498         if (name == NULL) {
499                 CERROR("%s: empty name for lookup: rc = %d\n",
500                        tgt_name(tsi->tsi_tgt), -EPROTO);
501                 RETURN(err_serious(-EPROTO));
502         }
503
504         dt_read_lock(env, obj, MOR_TGT_CHILD);
505         if (!dt_try_as_dir(env, obj))
506                 GOTO(out_unlock, rc = -ENOTDIR);
507
508         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
509                 (struct dt_key *)name, NULL);
510
511         if (rc < 0)
512                 GOTO(out_unlock, rc);
513
514         if (rc == 0)
515                 rc += 1;
516
517         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
518                PFID(lu_object_fid(&obj->do_lu)), name,
519                PFID(&tti->tti_fid1), rc);
520
521 out_unlock:
522         dt_read_unlock(env, obj);
523
524         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
525                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
526                0, rc);
527
528         object_update_result_insert(tti->tti_u.update.tti_update_reply,
529                             &tti->tti_fid1, sizeof(tti->tti_fid1),
530                             tti->tti_u.update.tti_update_reply_index, rc);
531         RETURN(rc);
532 }
533
534 static int out_tx_xattr_set_exec(const struct lu_env *env,
535                                  struct thandle *th,
536                                  struct tx_arg *arg)
537 {
538         struct dt_object *dt_obj = arg->object;
539         int rc;
540
541         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
542                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
543                arg->u.xattr_set.name, arg->u.xattr_set.flags);
544
545         if (!lu_object_exists(&dt_obj->do_lu))
546                 GOTO(out, rc = -ENOENT);
547
548         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
549         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
550                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
551                           th, NULL);
552         dt_write_unlock(env, dt_obj);
553         /**
554          * Ignore errors if this is LINK EA
555          **/
556         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
557                 rc = 0;
558 out:
559         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
560                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
561
562         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
563
564         return rc;
565 }
566
567 static int __out_tx_xattr_set(const struct lu_env *env,
568                               struct dt_object *dt_obj,
569                               const struct lu_buf *buf,
570                               const char *name, int flags,
571                               struct thandle_exec_args *ta,
572                               struct object_update_reply *reply,
573                               int index, char *file, int line)
574 {
575         struct tx_arg           *arg;
576
577         LASSERT(ta->ta_handle != NULL);
578         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
579                                           flags, ta->ta_handle);
580         if (ta->ta_err != 0)
581                 return ta->ta_err;
582
583         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
584         LASSERT(arg);
585         lu_object_get(&dt_obj->do_lu);
586         arg->object = dt_obj;
587         arg->u.xattr_set.name = name;
588         arg->u.xattr_set.flags = flags;
589         arg->u.xattr_set.buf = *buf;
590         arg->reply = reply;
591         arg->index = index;
592         arg->u.xattr_set.csum = 0;
593         return 0;
594 }
595
596 static int out_xattr_set(struct tgt_session_info *tsi)
597 {
598         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
599         struct object_update    *update = tti->tti_u.update.tti_update;
600         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
601         struct lu_buf           *lbuf = &tti->tti_buf;
602         char                    *name;
603         char                    *buf;
604         __u32                   *tmp;
605         int                      buf_len = 0;
606         int                      flag;
607         int                      size = 0;
608         int                      rc;
609         ENTRY;
610
611         name = object_update_param_get(update, 0, NULL);
612         if (name == NULL) {
613                 CERROR("%s: empty name for xattr set: rc = %d\n",
614                        tgt_name(tsi->tsi_tgt), -EPROTO);
615                 RETURN(err_serious(-EPROTO));
616         }
617
618         buf = object_update_param_get(update, 1, &buf_len);
619         if (buf == NULL || buf_len == 0) {
620                 CERROR("%s: empty buf for xattr set: rc = %d\n",
621                        tgt_name(tsi->tsi_tgt), -EPROTO);
622                 RETURN(err_serious(-EPROTO));
623         }
624
625         lbuf->lb_buf = buf;
626         lbuf->lb_len = buf_len;
627
628         tmp = object_update_param_get(update, 2, &size);
629         if (tmp == NULL || size != sizeof(*tmp)) {
630                 CERROR("%s: emptry or wrong size %d flag: rc = %d\n",
631                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
632                 RETURN(err_serious(-EPROTO));
633         }
634
635         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
636                 __swab32s(tmp);
637         flag = *tmp;
638
639         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
640                               &tti->tti_tea,
641                               tti->tti_u.update.tti_update_reply,
642                               tti->tti_u.update.tti_update_reply_index);
643         RETURN(rc);
644 }
645
646 static int out_obj_ref_add(const struct lu_env *env,
647                            struct dt_object *dt_obj,
648                            struct thandle *th)
649 {
650         int rc;
651
652         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
653         rc = dt_ref_add(env, dt_obj, th);
654         dt_write_unlock(env, dt_obj);
655
656         return rc;
657 }
658
659 static int out_obj_ref_del(const struct lu_env *env,
660                            struct dt_object *dt_obj,
661                            struct thandle *th)
662 {
663         int rc;
664
665         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
666         rc = dt_ref_del(env, dt_obj, th);
667         dt_write_unlock(env, dt_obj);
668
669         return rc;
670 }
671
672 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
673                                struct tx_arg *arg)
674 {
675         struct dt_object *dt_obj = arg->object;
676         int rc;
677
678         rc = out_obj_ref_add(env, dt_obj, th);
679
680         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
681                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
682
683         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
684         return rc;
685 }
686
687 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
688                                struct tx_arg *arg)
689 {
690         return out_obj_ref_del(env, arg->object, th);
691 }
692
693 static int __out_tx_ref_add(const struct lu_env *env,
694                             struct dt_object *dt_obj,
695                             struct thandle_exec_args *ta,
696                             struct object_update_reply *reply,
697                             int index, char *file, int line)
698 {
699         struct tx_arg   *arg;
700
701         LASSERT(ta->ta_handle != NULL);
702         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
703         if (ta->ta_err != 0)
704                 return ta->ta_err;
705
706         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
707                           line);
708         LASSERT(arg);
709         lu_object_get(&dt_obj->do_lu);
710         arg->object = dt_obj;
711         arg->reply = reply;
712         arg->index = index;
713         return 0;
714 }
715
716 /**
717  * increase ref of the object
718  **/
719 static int out_ref_add(struct tgt_session_info *tsi)
720 {
721         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
722         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
723         int                      rc;
724
725         ENTRY;
726
727         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
728                             tti->tti_u.update.tti_update_reply,
729                             tti->tti_u.update.tti_update_reply_index);
730         RETURN(rc);
731 }
732
733 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
734                                struct tx_arg *arg)
735 {
736         struct dt_object        *dt_obj = arg->object;
737         int                      rc;
738
739         rc = out_obj_ref_del(env, dt_obj, th);
740
741         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
742                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
743
744         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
745
746         return rc;
747 }
748
749 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
750                                struct tx_arg *arg)
751 {
752         return out_obj_ref_add(env, arg->object, th);
753 }
754
755 static int __out_tx_ref_del(const struct lu_env *env,
756                             struct dt_object *dt_obj,
757                             struct thandle_exec_args *ta,
758                             struct object_update_reply *reply,
759                             int index, char *file, int line)
760 {
761         struct tx_arg   *arg;
762
763         LASSERT(ta->ta_handle != NULL);
764         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
765         if (ta->ta_err != 0)
766                 return ta->ta_err;
767
768         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
769                           line);
770         LASSERT(arg);
771         lu_object_get(&dt_obj->do_lu);
772         arg->object = dt_obj;
773         arg->reply = reply;
774         arg->index = index;
775         return 0;
776 }
777
778 static int out_ref_del(struct tgt_session_info *tsi)
779 {
780         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
781         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
782         int                      rc;
783
784         ENTRY;
785
786         if (!lu_object_exists(&obj->do_lu))
787                 RETURN(-ENOENT);
788
789         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
790                             tti->tti_u.update.tti_update_reply,
791                             tti->tti_u.update.tti_update_reply_index);
792         RETURN(rc);
793 }
794
795 static int out_obj_index_insert(const struct lu_env *env,
796                                 struct dt_object *dt_obj,
797                                 const struct dt_rec *rec,
798                                 const struct dt_key *key,
799                                 struct thandle *th)
800 {
801         int rc;
802
803         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
804                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
805                (char *)key, PFID((struct lu_fid *)rec));
806
807         if (dt_try_as_dir(env, dt_obj) == 0)
808                 return -ENOTDIR;
809
810         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
811         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
812         dt_write_unlock(env, dt_obj);
813
814         return rc;
815 }
816
817 static int out_obj_index_delete(const struct lu_env *env,
818                                 struct dt_object *dt_obj,
819                                 const struct dt_key *key,
820                                 struct thandle *th)
821 {
822         int rc;
823
824         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
825                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
826                (char *)key);
827
828         if (dt_try_as_dir(env, dt_obj) == 0)
829                 return -ENOTDIR;
830
831         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
832         rc = dt_delete(env, dt_obj, key, th, NULL);
833         dt_write_unlock(env, dt_obj);
834
835         return rc;
836 }
837
838 static int out_tx_index_insert_exec(const struct lu_env *env,
839                                     struct thandle *th, struct tx_arg *arg)
840 {
841         struct dt_object *dt_obj = arg->object;
842         int rc;
843
844         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
845                                   arg->u.insert.key, th);
846
847         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
848                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
849
850         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
851
852         return rc;
853 }
854
855 static int out_tx_index_insert_undo(const struct lu_env *env,
856                                     struct thandle *th, struct tx_arg *arg)
857 {
858         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
859 }
860
861 static int __out_tx_index_insert(const struct lu_env *env,
862                                  struct dt_object *dt_obj,
863                                  char *name, struct lu_fid *fid,
864                                  struct thandle_exec_args *ta,
865                                  struct object_update_reply *reply,
866                                  int index, char *file, int line)
867 {
868         struct tx_arg *arg;
869
870         LASSERT(ta->ta_handle != NULL);
871
872         if (dt_try_as_dir(env, dt_obj) == 0) {
873                 ta->ta_err = -ENOTDIR;
874                 return ta->ta_err;
875         }
876
877         ta->ta_err = dt_declare_insert(env, dt_obj,
878                                        (struct dt_rec *)fid,
879                                        (struct dt_key *)name,
880                                        ta->ta_handle);
881
882         if (ta->ta_err != 0)
883                 return ta->ta_err;
884
885         arg = tx_add_exec(ta, out_tx_index_insert_exec,
886                           out_tx_index_insert_undo, file,
887                           line);
888         LASSERT(arg);
889         lu_object_get(&dt_obj->do_lu);
890         arg->object = dt_obj;
891         arg->reply = reply;
892         arg->index = index;
893         arg->u.insert.rec = (struct dt_rec *)fid;
894         arg->u.insert.key = (struct dt_key *)name;
895
896         return 0;
897 }
898
899 static int out_index_insert(struct tgt_session_info *tsi)
900 {
901         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
902         struct object_update    *update = tti->tti_u.update.tti_update;
903         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
904         struct lu_fid     *fid;
905         char              *name;
906         int                rc = 0;
907         int                size;
908
909         ENTRY;
910
911         name = object_update_param_get(update, 0, NULL);
912         if (name == NULL) {
913                 CERROR("%s: empty name for index insert: rc = %d\n",
914                        tgt_name(tsi->tsi_tgt), -EPROTO);
915                 RETURN(err_serious(-EPROTO));
916         }
917
918         fid = object_update_param_get(update, 1, &size);
919         if (fid == NULL || size != sizeof(*fid)) {
920                 CERROR("%s: invalid fid: rc = %d\n",
921                        tgt_name(tsi->tsi_tgt), -EPROTO);
922                        RETURN(err_serious(-EPROTO));
923         }
924
925         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
926                 lustre_swab_lu_fid(fid);
927
928         if (!fid_is_sane(fid)) {
929                 CERROR("%s: invalid FID "DFID": rc = %d\n",
930                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
931                 RETURN(err_serious(-EPROTO));
932         }
933
934         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
935                                  &tti->tti_tea,
936                                  tti->tti_u.update.tti_update_reply,
937                                  tti->tti_u.update.tti_update_reply_index);
938         RETURN(rc);
939 }
940
941 static int out_tx_index_delete_exec(const struct lu_env *env,
942                                     struct thandle *th,
943                                     struct tx_arg *arg)
944 {
945         int rc;
946
947         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
948
949         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
950                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
951
952         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
953
954         return rc;
955 }
956
957 static int out_tx_index_delete_undo(const struct lu_env *env,
958                                     struct thandle *th,
959                                     struct tx_arg *arg)
960 {
961         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
962                dt_obd_name(th->th_dev), -ENOTSUPP);
963         return -ENOTSUPP;
964 }
965
966 static int __out_tx_index_delete(const struct lu_env *env,
967                                  struct dt_object *dt_obj, char *name,
968                                  struct thandle_exec_args *ta,
969                                  struct object_update_reply *reply,
970                                  int index, char *file, int line)
971 {
972         struct tx_arg *arg;
973
974         if (dt_try_as_dir(env, dt_obj) == 0) {
975                 ta->ta_err = -ENOTDIR;
976                 return ta->ta_err;
977         }
978
979         LASSERT(ta->ta_handle != NULL);
980         ta->ta_err = dt_declare_delete(env, dt_obj,
981                                        (struct dt_key *)name,
982                                        ta->ta_handle);
983         if (ta->ta_err != 0)
984                 return ta->ta_err;
985
986         arg = tx_add_exec(ta, out_tx_index_delete_exec,
987                           out_tx_index_delete_undo, file,
988                           line);
989         LASSERT(arg);
990         lu_object_get(&dt_obj->do_lu);
991         arg->object = dt_obj;
992         arg->reply = reply;
993         arg->index = index;
994         arg->u.insert.key = (struct dt_key *)name;
995         return 0;
996 }
997
998 static int out_index_delete(struct tgt_session_info *tsi)
999 {
1000         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1001         struct object_update    *update = tti->tti_u.update.tti_update;
1002         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1003         char                    *name;
1004         int                      rc = 0;
1005
1006         if (!lu_object_exists(&obj->do_lu))
1007                 RETURN(-ENOENT);
1008
1009         name = object_update_param_get(update, 0, NULL);
1010         if (name == NULL) {
1011                 CERROR("%s: empty name for index delete: rc = %d\n",
1012                        tgt_name(tsi->tsi_tgt), -EPROTO);
1013                 RETURN(err_serious(-EPROTO));
1014         }
1015
1016         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1017                                  tti->tti_u.update.tti_update_reply,
1018                                  tti->tti_u.update.tti_update_reply_index);
1019         RETURN(rc);
1020 }
1021
1022 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1023                                struct tx_arg *arg)
1024 {
1025         struct dt_object *dt_obj = arg->object;
1026         int rc;
1027
1028         rc = out_obj_destroy(env, dt_obj, th);
1029
1030         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1031                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1032
1033         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1034
1035         RETURN(rc);
1036 }
1037
1038 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1039                                struct tx_arg *arg)
1040 {
1041         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1042                dt_obd_name(th->th_dev), -ENOTSUPP);
1043         return -ENOTSUPP;
1044 }
1045
1046 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1047                              struct thandle_exec_args *ta,
1048                              struct object_update_reply *reply,
1049                              int index, char *file, int line)
1050 {
1051         struct tx_arg *arg;
1052
1053         LASSERT(ta->ta_handle != NULL);
1054         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1055         if (ta->ta_err)
1056                 return ta->ta_err;
1057
1058         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1059                           file, line);
1060         LASSERT(arg);
1061         lu_object_get(&dt_obj->do_lu);
1062         arg->object = dt_obj;
1063         arg->reply = reply;
1064         arg->index = index;
1065         return 0;
1066 }
1067
1068 static int out_destroy(struct tgt_session_info *tsi)
1069 {
1070         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1071         struct object_update    *update = tti->tti_u.update.tti_update;
1072         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1073         struct lu_fid           *fid;
1074         int                      rc;
1075         ENTRY;
1076
1077         fid = &update->ou_fid;
1078         if (!fid_is_sane(fid)) {
1079                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1080                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1081                 RETURN(err_serious(-EPROTO));
1082         }
1083
1084         if (!lu_object_exists(&obj->do_lu))
1085                 RETURN(-ENOENT);
1086
1087         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1088                             tti->tti_u.update.tti_update_reply,
1089                             tti->tti_u.update.tti_update_reply_index);
1090
1091         RETURN(rc);
1092 }
1093
1094 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1095                              struct tx_arg *arg)
1096 {
1097         struct dt_object *dt_obj = arg->object;
1098         int rc;
1099
1100         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1101         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1102                              &arg->u.write.pos, th);
1103         dt_write_unlock(env, dt_obj);
1104
1105         if (rc == 0)
1106                 rc = arg->u.write.buf.lb_len;
1107
1108         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1109
1110         return rc > 0 ? 0 : rc;
1111 }
1112
1113 static int __out_tx_write(const struct lu_env *env,
1114                           struct dt_object *dt_obj,
1115                           const struct lu_buf *buf,
1116                           loff_t pos, struct thandle_exec_args *ta,
1117                           struct object_update_reply *reply,
1118                           int index, char *file, int line)
1119 {
1120         struct tx_arg   *arg;
1121
1122         LASSERT(ta->ta_handle != NULL);
1123         ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
1124                                              ta->ta_handle);
1125         if (ta->ta_err != 0)
1126                 return ta->ta_err;
1127
1128         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1129         LASSERT(arg);
1130         lu_object_get(&dt_obj->do_lu);
1131         arg->object = dt_obj;
1132         arg->u.write.buf = *buf;
1133         arg->u.write.pos = pos;
1134         arg->reply = reply;
1135         arg->index = index;
1136         return 0;
1137 }
1138
1139 static int out_write(struct tgt_session_info *tsi)
1140 {
1141         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1142         struct object_update    *update = tti->tti_u.update.tti_update;
1143         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1144         struct lu_buf           *lbuf = &tti->tti_buf;
1145         char                    *buf;
1146         __u64                   *tmp;
1147         int                     size = 0;
1148         int                     buf_len = 0;
1149         loff_t                  pos;
1150         int                      rc;
1151         ENTRY;
1152
1153         buf = object_update_param_get(update, 0, &buf_len);
1154         if (buf == NULL || buf_len == 0) {
1155                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1156                        tgt_name(tsi->tsi_tgt), -EPROTO);
1157                 RETURN(err_serious(-EPROTO));
1158         }
1159         lbuf->lb_buf = buf;
1160         lbuf->lb_len = buf_len;
1161
1162         tmp = object_update_param_get(update, 1, &size);
1163         if (tmp == NULL || size != sizeof(*tmp)) {
1164                 CERROR("%s: empty or wrong size %d pos: rc = %d\n",
1165                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
1166                 RETURN(err_serious(-EPROTO));
1167         }
1168
1169         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1170                 __swab64s(tmp);
1171         pos = *tmp;
1172
1173         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1174                           &tti->tti_tea,
1175                           tti->tti_u.update.tti_update_reply,
1176                           tti->tti_u.update.tti_update_reply_index);
1177         RETURN(rc);
1178 }
1179
1180 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1181 [opc - OUT_CREATE] = {                                  \
1182         .th_name    = name,                             \
1183         .th_fail_id = 0,                                \
1184         .th_opc     = opc,                              \
1185         .th_flags   = flags,                            \
1186         .th_act     = fn,                               \
1187         .th_fmt     = NULL,                             \
1188         .th_version = 0,                                \
1189 }
1190
1191 #define out_handler mdt_handler
1192 static struct tgt_handler out_update_ops[] = {
1193         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1194                      out_create),
1195         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1196                      out_destroy),
1197         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1198                      out_ref_add),
1199         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1200                      out_ref_del),
1201         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1202                      out_attr_set),
1203         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1204                      out_attr_get),
1205         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1206                      out_xattr_set),
1207         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1208                      out_xattr_get),
1209         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1210                      out_index_lookup),
1211         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1212                      MUTABOR | HABEO_REFERO, out_index_insert),
1213         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1214                      MUTABOR | HABEO_REFERO, out_index_delete),
1215         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1216 };
1217
1218 struct tgt_handler *out_handler_find(__u32 opc)
1219 {
1220         struct tgt_handler *h;
1221
1222         h = NULL;
1223         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1224                 h = &out_update_ops[opc - OUT_CREATE];
1225                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1226                          h->th_opc, opc);
1227         } else {
1228                 h = NULL; /* unsupported opc */
1229         }
1230         return h;
1231 }
1232
1233 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1234                         struct thandle_exec_args *ta, struct obd_export *exp)
1235 {
1236         memset(ta, 0, sizeof(*ta));
1237         ta->ta_handle = dt_trans_create(env, dt);
1238         if (IS_ERR(ta->ta_handle)) {
1239                 int rc;
1240
1241                 rc = PTR_ERR(ta->ta_handle);
1242                 ta->ta_handle = NULL;
1243                 CERROR("%s: start handle error: rc = %d\n",
1244                        dt_obd_name(dt), rc);
1245                 return rc;
1246         }
1247         ta->ta_dev = dt;
1248         if (exp->exp_need_sync)
1249                 ta->ta_handle->th_sync = 1;
1250
1251         return 0;
1252 }
1253
1254 static int out_trans_start(const struct lu_env *env,
1255                            struct thandle_exec_args *ta)
1256 {
1257         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
1258 }
1259
1260 static int out_trans_stop(const struct lu_env *env,
1261                           struct thandle_exec_args *ta, int err)
1262 {
1263         int i;
1264         int rc;
1265
1266         ta->ta_handle->th_result = err;
1267         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
1268         for (i = 0; i < ta->ta_argno; i++) {
1269                 if (ta->ta_args[i].object != NULL) {
1270                         struct dt_object *obj = ta->ta_args[i].object;
1271
1272                         /* If the object is being created during this
1273                          * transaction, we need to remove them from the
1274                          * cache immediately, because a few layers are
1275                          * missing in OUT handler, i.e. the object might
1276                          * not be initialized in all layers */
1277                         if (ta->ta_args[i].exec_fn == out_tx_create_exec)
1278                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1279                                         &obj->do_lu.lo_header->loh_flags);
1280                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
1281                         ta->ta_args[i].object = NULL;
1282                 }
1283         }
1284
1285         return rc;
1286 }
1287
1288 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
1289 {
1290         struct tgt_session_info *tsi = tgt_ses_info(env);
1291         int                     i;
1292         int                     rc;
1293         int                     rc1;
1294         ENTRY;
1295
1296         if (ta->ta_handle == NULL)
1297                 RETURN(0);
1298
1299         if (ta->ta_err != 0 || ta->ta_argno == 0)
1300                 GOTO(stop, rc = ta->ta_err);
1301
1302         LASSERT(ta->ta_dev != NULL);
1303         rc = out_trans_start(env, ta);
1304         if (unlikely(rc != 0))
1305                 GOTO(stop, rc);
1306
1307         for (i = 0; i < ta->ta_argno; i++) {
1308                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
1309                                             &ta->ta_args[i]);
1310                 if (unlikely(rc != 0)) {
1311                         CDEBUG(D_INFO, "error during execution of #%u from"
1312                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
1313                                ta->ta_args[i].line, rc);
1314                         while (--i >= 0) {
1315                                 if (ta->ta_args[i].undo_fn != NULL)
1316                                         ta->ta_args[i].undo_fn(env,
1317                                                                ta->ta_handle,
1318                                                               &ta->ta_args[i]);
1319                                 else
1320                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1321                                                dt_obd_name(ta->ta_dev),
1322                                                ta->ta_args[i].file,
1323                                                ta->ta_args[i].line, -ENOTSUPP);
1324                         }
1325                         break;
1326                 }
1327                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1328                        dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
1329         }
1330
1331         /* Only fail for real update */
1332         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1333 stop:
1334         rc1 = out_trans_stop(env, ta, rc);
1335         if (rc == 0)
1336                 rc = rc1;
1337
1338         ta->ta_handle = NULL;
1339         ta->ta_argno = 0;
1340         ta->ta_err = 0;
1341
1342         RETURN(rc);
1343 }
1344
1345 /**
1346  * Object updates between Targets. Because all the updates has been
1347  * dis-assemblied into object updates at sender side, so OUT will
1348  * call OSD API directly to execute these updates.
1349  *
1350  * In DNE phase I all of the updates in the request need to be executed
1351  * in one transaction, and the transaction has to be synchronously.
1352  *
1353  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1354  * format.
1355  */
1356 int out_handle(struct tgt_session_info *tsi)
1357 {
1358         const struct lu_env             *env = tsi->tsi_env;
1359         struct tgt_thread_info          *tti = tgt_th_info(env);
1360         struct thandle_exec_args        *ta = &tti->tti_tea;
1361         struct req_capsule              *pill = tsi->tsi_pill;
1362         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1363         struct object_update_request    *ureq;
1364         struct object_update            *update;
1365         struct object_update_reply      *reply;
1366         int                              bufsize;
1367         int                              count;
1368         int                              current_batchid = -1;
1369         int                              i;
1370         int                              rc = 0;
1371         int                              rc1 = 0;
1372
1373         ENTRY;
1374
1375         req_capsule_set(pill, &RQF_OUT_UPDATE);
1376         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1377         if (ureq == NULL) {
1378                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1379                        -EPROTO);
1380                 RETURN(err_serious(-EPROTO));
1381         }
1382
1383         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1384         if (bufsize != object_update_request_size(ureq)) {
1385                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1386                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1387                 RETURN(err_serious(-EPROTO));
1388         }
1389
1390         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1391                 CERROR("%s: invalid update buffer magic %x expect %x: "
1392                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1393                        UPDATE_REQUEST_MAGIC, -EPROTO);
1394                 RETURN(err_serious(-EPROTO));
1395         }
1396
1397         count = ureq->ourq_count;
1398         if (count <= 0) {
1399                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1400                        -EPROTO);
1401                 RETURN(err_serious(-EPROTO));
1402         }
1403
1404         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1405                              OUT_UPDATE_REPLY_SIZE);
1406         rc = req_capsule_server_pack(pill);
1407         if (rc != 0) {
1408                 CERROR("%s: Can't pack response: rc = %d\n",
1409                        tgt_name(tsi->tsi_tgt), rc);
1410                 RETURN(rc);
1411         }
1412
1413         /* Prepare the update reply buffer */
1414         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1415         if (reply == NULL)
1416                 RETURN(err_serious(-EPROTO));
1417         object_update_reply_init(reply, count);
1418         tti->tti_u.update.tti_update_reply = reply;
1419         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1420
1421         memset(ta, 0, sizeof(*ta));
1422         /* Walk through updates in the request to execute them synchronously */
1423         for (i = 0; i < count; i++) {
1424                 struct tgt_handler      *h;
1425                 struct dt_object        *dt_obj;
1426
1427                 update = object_update_request_get(ureq, i, NULL);
1428                 if (update == NULL)
1429                         GOTO(out, rc = -EPROTO);
1430
1431                 if (ptlrpc_req_need_swab(pill->rc_req))
1432                         lustre_swab_object_update(update);
1433
1434                 if (!fid_is_sane(&update->ou_fid)) {
1435                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1436                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1437                                -EPROTO);
1438                         GOTO(out, rc = err_serious(-EPROTO));
1439                 }
1440
1441                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1442                 if (IS_ERR(dt_obj))
1443                         GOTO(out, rc = PTR_ERR(dt_obj));
1444
1445                 if (dt->dd_record_fid_accessed) {
1446                         lfsck_pack_rfa(&tti->tti_lr,
1447                                        lu_object_fid(&dt_obj->do_lu));
1448                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1449                 }
1450
1451                 tti->tti_u.update.tti_dt_object = dt_obj;
1452                 tti->tti_u.update.tti_update = update;
1453                 tti->tti_u.update.tti_update_reply_index = i;
1454
1455                 h = out_handler_find(update->ou_type);
1456                 if (unlikely(h == NULL)) {
1457                         CERROR("%s: unsupported opc: 0x%x\n",
1458                                tgt_name(tsi->tsi_tgt), update->ou_type);
1459                         GOTO(next, rc = -ENOTSUPP);
1460                 }
1461
1462                 /* start transaction for modification RPC only */
1463                 if (h->th_flags & MUTABOR && current_batchid == -1) {
1464                         current_batchid = update->ou_batchid;
1465                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1466                         if (rc < 0)
1467                                 GOTO(next, rc);
1468                 }
1469
1470                 /* Stop the current update transaction,
1471                  * if it meets a different batchid, or
1472                  * it is read-only RPC */
1473                 if (((current_batchid != update->ou_batchid) ||
1474                      !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
1475                         rc = out_tx_end(env, ta);
1476                         current_batchid = -1;
1477                         if (rc != 0)
1478                                 GOTO(next, rc);
1479
1480                         /* start a new transaction if needed */
1481                         if (h->th_flags & MUTABOR) {
1482                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1483                                 if (rc != 0)
1484                                         GOTO(next, rc);
1485
1486                                 current_batchid = update->ou_batchid;
1487                         }
1488                 }
1489
1490                 /* Check resend case only for modifying RPC */
1491                 if (h->th_flags & MUTABOR) {
1492                         struct ptlrpc_request *req = tgt_ses_req(tsi);
1493
1494                         if (out_check_resent(env, dt, dt_obj, req,
1495                                              out_reconstruct, reply,
1496                                              i))
1497                                 GOTO(next, rc);
1498                 }
1499
1500                 rc = h->th_act(tsi);
1501 next:
1502                 lu_object_put(env, &dt_obj->do_lu);
1503                 if (rc < 0)
1504                         GOTO(out, rc);
1505         }
1506 out:
1507         if (current_batchid != -1) {
1508                 rc1 = out_tx_end(env, ta);
1509                 if (rc == 0)
1510                         rc = rc1;
1511         }
1512
1513         RETURN(rc);
1514 }
1515
1516 struct tgt_handler tgt_out_handlers[] = {
1517 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1518 };
1519 EXPORT_SYMBOL(tgt_out_handlers);
1520