Whamcloud - gitweb
LU-1187 out: add resend check for update.
[fs/lustre-release.git] / lustre / mdt / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38 #include <lustre_update.h>
39
40 static const char dot[] = ".";
41 static const char dotdot[] = "..";
42
43 /* Current out and mdt shared the same thread info, but in the future,
44  * this should be decoupled with MDT XXX*/
45 #define out_thread_info         mdt_thread_info
46 #define out_thread_key          mdt_thread_key
47
48 struct out_thread_info *out_env_info(const struct lu_env *env)
49 {
50         struct out_thread_info *info;
51
52         info = lu_context_key_get(&env->le_ctx, &out_thread_key);
53         LASSERT(info != NULL);
54         return info;
55 }
56
57 static inline char *dt_obd_name(struct dt_device *dt)
58 {
59         return dt->dd_lu_dev.ld_obd->obd_name;
60 }
61
62 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
63                            tx_exec_func_t undo, char *file, int line)
64 {
65         int i;
66
67         LASSERT(ta);
68         LASSERT(func);
69
70         i = ta->ta_argno;
71         LASSERT(i < UPDATE_MAX_OPS);
72
73         ta->ta_argno++;
74
75         ta->ta_args[i].exec_fn = func;
76         ta->ta_args[i].undo_fn = undo;
77         ta->ta_args[i].file    = file;
78         ta->ta_args[i].line    = line;
79
80         return &ta->ta_args[i];
81 }
82
83 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
84                         struct thandle_exec_args *th)
85 {
86         memset(th, 0, sizeof(*th));
87         th->ta_handle = dt_trans_create(env, dt);
88         if (IS_ERR(th->ta_handle)) {
89                 CERROR("%s: start handle error: rc = %ld\n",
90                        dt_obd_name(dt), PTR_ERR(th->ta_handle));
91                 return PTR_ERR(th->ta_handle);
92         }
93         th->ta_dev = dt;
94         /*For phase I, sync for cross-ref operation*/
95         th->ta_handle->th_sync = 1;
96         return 0;
97 }
98
99 static int out_trans_start(const struct lu_env *env,
100                            struct thandle_exec_args *th)
101 {
102         /* Always do sync commit for Phase I */
103         LASSERT(th->ta_handle->th_sync != 0);
104         return dt_trans_start(env, th->ta_dev, th->ta_handle);
105 }
106
107 static int out_trans_stop(const struct lu_env *env,
108                           struct thandle_exec_args *th, int err)
109 {
110         int i;
111         int rc;
112
113         th->ta_handle->th_result = err;
114         LASSERT(th->ta_handle->th_sync != 0);
115         rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
116         for (i = 0; i < th->ta_argno; i++) {
117                 if (th->ta_args[i].object != NULL) {
118                         lu_object_put(env, &th->ta_args[i].object->do_lu);
119                         th->ta_args[i].object = NULL;
120                 }
121         }
122
123         return rc;
124 }
125
126 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
127 {
128         struct out_thread_info *info = out_env_info(env);
129         int i = 0, rc;
130
131         LASSERT(th->ta_dev);
132         LASSERT(th->ta_handle);
133
134         if (th->ta_err != 0 || th->ta_argno == 0)
135                 GOTO(stop, rc = th->ta_err);
136
137         rc = out_trans_start(env, th);
138         if (unlikely(rc))
139                 GOTO(stop, rc);
140
141         for (i = 0; i < th->ta_argno; i++) {
142                 rc = th->ta_args[i].exec_fn(env, th->ta_handle,
143                                             &th->ta_args[i]);
144                 if (unlikely(rc)) {
145                         CDEBUG(D_INFO, "error during execution of #%u from"
146                                " %s:%d: rc = %d\n", i, th->ta_args[i].file,
147                                th->ta_args[i].line, rc);
148                         while (--i >= 0) {
149                                 LASSERTF(th->ta_args[i].undo_fn != NULL,
150                                     "can't undo changes, hope for failover!\n");
151                                 th->ta_args[i].undo_fn(env, th->ta_handle,
152                                                        &th->ta_args[i]);
153                         }
154                         break;
155                 }
156         }
157
158         /* Only fail for real update */
159         info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
160 stop:
161         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
162                dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
163         out_trans_stop(env, th, rc);
164         th->ta_handle = NULL;
165         th->ta_argno = 0;
166         th->ta_err = 0;
167
168         RETURN(rc);
169 }
170
171 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
172                             struct dt_object *obj, struct update_reply *reply,
173                             int index)
174 {
175         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
176                dt_obd_name(dt), reply, index, 0);
177
178         update_insert_reply(reply, NULL, 0, index, 0);
179         return;
180 }
181
182 typedef void (*out_reconstruct_t)(const struct lu_env *env,
183                                   struct dt_device *dt,
184                                   struct dt_object *obj,
185                                   struct update_reply *reply,
186                                   int index);
187
188 static inline int out_check_resent(const struct lu_env *env,
189                                    struct dt_device *dt,
190                                    struct dt_object *obj,
191                                    struct ptlrpc_request *req,
192                                    out_reconstruct_t reconstruct,
193                                    struct update_reply *reply,
194                                    int index)
195 {
196         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
197                 return 0;
198
199         if (req_xid_is_last(req)) {
200                 reconstruct(env, dt, obj, reply, index);
201                 return 1;
202         }
203         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
204                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
205         return 0;
206 }
207
208 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
209                            struct thandle *th)
210 {
211         int rc;
212
213         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
214                PFID(lu_object_fid(&dt_obj->do_lu)));
215
216         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
217         rc = dt_destroy(env, dt_obj, th);
218         dt_write_unlock(env, dt_obj);
219
220         return rc;
221 }
222
223 /**
224  * All of the xxx_undo will be used once execution failed,
225  * But because all of the required resource has been reserved in
226  * declare phase, i.e. if declare succeed, it should make sure
227  * the following executing phase succeed in anyway, so these undo
228  * should be useless for most of the time in Phase I
229  */
230 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
231                        struct tx_arg *arg)
232 {
233         int rc;
234
235         rc = out_obj_destroy(env, arg->object, th);
236         if (rc != 0)
237                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
238                        dt_obd_name(th->th_dev), rc);
239         return rc;
240 }
241
242 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
243                        struct tx_arg *arg)
244 {
245         struct dt_object *dt_obj = arg->object;
246         int rc;
247
248         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
249                dt_obd_name(th->th_dev),
250                PFID(lu_object_fid(&arg->object->do_lu)),
251                arg->u.create.dof.dof_type,
252                arg->u.create.attr.la_mode & S_IFMT);
253
254         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
255         rc = dt_create(env, dt_obj, &arg->u.create.attr,
256                        &arg->u.create.hint, &arg->u.create.dof, th);
257
258         dt_write_unlock(env, dt_obj);
259
260         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
261                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
262
263         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
264
265         return rc;
266 }
267
268 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
269                            struct lu_attr *attr, struct lu_fid *parent_fid,
270                            struct dt_object_format *dof,
271                            struct thandle_exec_args *th,
272                            struct update_reply *reply,
273                            int index, char *file, int line)
274 {
275         struct tx_arg *arg;
276
277         LASSERT(th->ta_handle != NULL);
278         th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
279                                        th->ta_handle);
280         if (th->ta_err != 0)
281                 return th->ta_err;
282
283         arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
284                           line);
285         LASSERT(arg);
286
287         /* release the object in out_trans_stop */
288         lu_object_get(&obj->do_lu);
289         arg->object = obj;
290         arg->u.create.attr = *attr;
291         if (parent_fid)
292                 arg->u.create.fid = *parent_fid;
293         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
294         arg->u.create.dof  = *dof;
295         arg->reply = reply;
296         arg->index = index;
297
298         return 0;
299 }
300
301 static int out_create(struct out_thread_info *info)
302 {
303         struct update           *update = info->mti_u.update.mti_update;
304         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
305         struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
306         struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
307         struct lu_attr          *attr = &info->mti_attr.ma_attr;
308         struct lu_fid           *fid = NULL;
309         struct obdo             *wobdo;
310         int                     size;
311         int                     rc;
312
313         ENTRY;
314
315         wobdo = update_param_buf(update, 0, &size);
316         if (wobdo == NULL || size != sizeof(*wobdo)) {
317                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
318                        mdt_obd_name(info->mti_mdt), -EPROTO);
319                 RETURN(err_serious(-EPROTO));
320         }
321
322         obdo_le_to_cpu(wobdo, wobdo);
323         lustre_get_wire_obdo(lobdo, wobdo);
324         la_from_obdo(attr, lobdo, lobdo->o_valid);
325
326         dof->dof_type = dt_mode_to_dft(attr->la_mode);
327         if (S_ISDIR(attr->la_mode)) {
328                 int size;
329
330                 fid = update_param_buf(update, 1, &size);
331                 if (fid == NULL || size != sizeof(*fid)) {
332                         CERROR("%s: invalid fid: rc = %d\n",
333                                mdt_obd_name(info->mti_mdt), -EPROTO);
334                         RETURN(err_serious(-EPROTO));
335                 }
336                 fid_le_to_cpu(fid, fid);
337                 if (!fid_is_sane(fid)) {
338                         CERROR("%s: invalid fid "DFID": rc = %d\n",
339                                mdt_obd_name(info->mti_mdt),
340                                PFID(fid), -EPROTO);
341                         RETURN(err_serious(-EPROTO));
342                 }
343         }
344
345         if (lu_object_exists(&obj->do_lu))
346                 RETURN(-EEXIST);
347
348         rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
349                            &info->mti_handle,
350                            info->mti_u.update.mti_update_reply,
351                            info->mti_u.update.mti_update_reply_index);
352
353         RETURN(rc);
354 }
355
356 static int out_tx_attr_set_undo(const struct lu_env *env,
357                                 struct thandle *th, struct tx_arg *arg)
358 {
359         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
360                dt_obd_name(th->th_dev),
361                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
362
363         return -ENOTSUPP;
364 }
365
366 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
367                                 struct tx_arg *arg)
368 {
369         struct dt_object        *dt_obj = arg->object;
370         int                     rc;
371
372         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
373                PFID(lu_object_fid(&dt_obj->do_lu)));
374
375         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
376         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
377                          th, NULL);
378         dt_write_unlock(env, dt_obj);
379
380         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
381                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
382
383         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
384
385         return rc;
386 }
387
388 static int __out_tx_attr_set(const struct lu_env *env,
389                              struct dt_object *dt_obj,
390                              const struct lu_attr *attr,
391                              struct thandle_exec_args *th,
392                              struct update_reply *reply, int index,
393                              char *file, int line)
394 {
395         struct tx_arg           *arg;
396
397         LASSERT(th->ta_handle != NULL);
398         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
399         if (th->ta_err != 0)
400                 return th->ta_err;
401
402         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
403                           file, line);
404         LASSERT(arg);
405         lu_object_get(&dt_obj->do_lu);
406         arg->object = dt_obj;
407         arg->u.attr_set.attr = *attr;
408         arg->reply = reply;
409         arg->index = index;
410         return 0;
411 }
412
413 static int out_attr_set(struct out_thread_info *info)
414 {
415         struct update           *update = info->mti_u.update.mti_update;
416         struct lu_attr          *attr = &info->mti_attr.ma_attr;
417         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
418         struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
419         struct obdo             *wobdo;
420         int                     size;
421         int                     rc;
422
423         ENTRY;
424
425         wobdo = update_param_buf(update, 0, &size);
426         if (wobdo == NULL || size != sizeof(*wobdo)) {
427                 CERROR("%s: empty obdo in the update: rc = %d\n",
428                        mdt_obd_name(info->mti_mdt), -EPROTO);
429                 RETURN(err_serious(-EPROTO));
430         }
431
432         attr->la_valid = 0;
433         attr->la_valid = 0;
434         obdo_le_to_cpu(wobdo, wobdo);
435         lustre_get_wire_obdo(lobdo, wobdo);
436         la_from_obdo(attr, lobdo, lobdo->o_valid);
437
438         rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
439                              info->mti_u.update.mti_update_reply,
440                              info->mti_u.update.mti_update_reply_index);
441
442         RETURN(rc);
443 }
444
445 static int out_attr_get(struct out_thread_info *info)
446 {
447         struct obdo             *obdo = &info->mti_u.update.mti_obdo;
448         const struct lu_env     *env = info->mti_env;
449         struct lu_attr          *la = &info->mti_attr.ma_attr;
450         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
451         int                     rc;
452
453         ENTRY;
454
455         if (!lu_object_exists(&obj->do_lu))
456                 RETURN(-ENOENT);
457
458         dt_read_lock(env, obj, MOR_TGT_CHILD);
459         rc = dt_attr_get(env, obj, la, NULL);
460         if (rc)
461                 GOTO(out_unlock, rc);
462         /*
463          * If it is a directory, we will also check whether the
464          * directory is empty.
465          * la_flags = 0 : Empty.
466          *          = 1 : Not empty.
467          */
468         la->la_flags = 0;
469         if (S_ISDIR(la->la_mode)) {
470                 struct dt_it     *it;
471                 const struct dt_it_ops *iops;
472
473                 if (!dt_try_as_dir(env, obj))
474                         GOTO(out_unlock, rc = -ENOTDIR);
475
476                 iops = &obj->do_index_ops->dio_it;
477                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
478                 if (!IS_ERR(it)) {
479                         int  result;
480                         result = iops->get(env, it, (const void *)"");
481                         if (result > 0) {
482                                 int i;
483                                 for (result = 0, i = 0; result == 0 && i < 3;
484                                      ++i)
485                                         result = iops->next(env, it);
486                                 if (result == 0)
487                                         la->la_flags = 1;
488                         } else if (result == 0)
489                                 /*
490                                  * Huh? Index contains no zero key?
491                                  */
492                                 rc = -EIO;
493
494                         iops->put(env, it);
495                         iops->fini(env, it);
496                 }
497         }
498
499         obdo->o_valid = 0;
500         obdo_from_la(obdo, la, la->la_valid);
501         obdo_cpu_to_le(obdo, obdo);
502         lustre_set_wire_obdo(obdo, obdo);
503
504 out_unlock:
505         dt_read_unlock(env, obj);
506
507         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
508                mdt_obd_name(info->mti_mdt),
509                info->mti_u.update.mti_update_reply, 0, rc);
510
511         update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
512                             sizeof(*obdo), 0, rc);
513         RETURN(rc);
514 }
515
516 static int out_xattr_get(struct out_thread_info *info)
517 {
518         struct update           *update = info->mti_u.update.mti_update;
519         const struct lu_env     *env = info->mti_env;
520         struct lu_buf           *lbuf = &info->mti_buf;
521         struct update_reply     *reply = info->mti_u.update.mti_update_reply;
522         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
523         char                    *name;
524         void                    *ptr;
525         int                     rc;
526
527         ENTRY;
528
529         name = (char *)update_param_buf(update, 0, NULL);
530         if (name == NULL) {
531                 CERROR("%s: empty name for xattr get: rc = %d\n",
532                        mdt_obd_name(info->mti_mdt), -EPROTO);
533                 RETURN(err_serious(-EPROTO));
534         }
535
536         ptr = update_get_buf_internal(reply, 0, NULL);
537         LASSERT(ptr != NULL);
538
539         /* The first 4 bytes(int) are used to store the result */
540         lbuf->lb_buf = (char *)ptr + sizeof(int);
541         lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
542         dt_read_lock(env, obj, MOR_TGT_CHILD);
543         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
544         dt_read_unlock(env, obj);
545         if (rc < 0) {
546                 lbuf->lb_len = 0;
547                 GOTO(out, rc);
548         }
549         if (rc == 0) {
550                 lbuf->lb_len = 0;
551                 GOTO(out, rc = -ENOENT);
552         }
553         lbuf->lb_len = rc;
554         rc = 0;
555         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
556                mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
557                name, (int)lbuf->lb_len);
558 out:
559         *(int *)ptr = rc;
560         reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
561         RETURN(rc);
562 }
563
564 static int out_index_lookup(struct out_thread_info *info)
565 {
566         struct update           *update = info->mti_u.update.mti_update;
567         const struct lu_env     *env = info->mti_env;
568         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
569         char                    *name;
570         int                     rc;
571
572         ENTRY;
573
574         if (!lu_object_exists(&obj->do_lu))
575                 RETURN(-ENOENT);
576
577         name = (char *)update_param_buf(update, 0, NULL);
578         if (name == NULL) {
579                 CERROR("%s: empty name for lookup: rc = %d\n",
580                        mdt_obd_name(info->mti_mdt), -EPROTO);
581                 RETURN(err_serious(-EPROTO));
582         }
583
584         dt_read_lock(env, obj, MOR_TGT_CHILD);
585         if (!dt_try_as_dir(env, obj))
586                 GOTO(out_unlock, rc = -ENOTDIR);
587
588         rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
589                 (struct dt_key *)name, NULL);
590
591         if (rc < 0)
592                 GOTO(out_unlock, rc);
593
594         if (rc == 0)
595                 rc += 1;
596
597         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
598                PFID(lu_object_fid(&obj->do_lu)), name,
599                PFID(&info->mti_tmp_fid1), rc);
600         fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
601
602 out_unlock:
603         dt_read_unlock(env, obj);
604
605         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
606                mdt_obd_name(info->mti_mdt),
607                info->mti_u.update.mti_update_reply, 0, rc);
608
609         update_insert_reply(info->mti_u.update.mti_update_reply,
610                             &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
611                             0, rc);
612         RETURN(rc);
613 }
614
615 static int out_tx_xattr_set_exec(const struct lu_env *env,
616                                  struct thandle *th,
617                                  struct tx_arg *arg)
618 {
619         struct dt_object *dt_obj = arg->object;
620         int rc;
621
622         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
623                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
624                arg->u.xattr_set.name, arg->u.xattr_set.flags);
625
626         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
627         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
628                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
629                           th, NULL);
630         dt_write_unlock(env, dt_obj);
631         /**
632          * Ignore errors if this is LINK EA
633          **/
634         if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
635                                     strlen(XATTR_NAME_LINK))))
636                 rc = 0;
637
638         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
639                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
640
641         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
642
643         return rc;
644 }
645
646 static int __out_tx_xattr_set(const struct lu_env *env,
647                               struct dt_object *dt_obj,
648                               const struct lu_buf *buf,
649                               const char *name, int flags,
650                               struct thandle_exec_args *th,
651                               struct update_reply *reply, int index,
652                               char *file, int line)
653 {
654         struct tx_arg           *arg;
655
656         LASSERT(th->ta_handle != NULL);
657         th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
658                                           flags, th->ta_handle);
659         if (th->ta_err != 0)
660                 return th->ta_err;
661
662         arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
663         LASSERT(arg);
664         lu_object_get(&dt_obj->do_lu);
665         arg->object = dt_obj;
666         arg->u.xattr_set.name = name;
667         arg->u.xattr_set.flags = flags;
668         arg->u.xattr_set.buf = *buf;
669         arg->reply = reply;
670         arg->index = index;
671         arg->u.xattr_set.csum = 0;
672         return 0;
673 }
674
675 static int out_xattr_set(struct out_thread_info *info)
676 {
677         struct update           *update = info->mti_u.update.mti_update;
678         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
679         struct lu_buf           *lbuf = &info->mti_buf;
680         char                    *name;
681         char                    *buf;
682         char                    *tmp;
683         int                     buf_len = 0;
684         int                     flag;
685         int                     rc;
686         ENTRY;
687
688         name = update_param_buf(update, 0, NULL);
689         if (name == NULL) {
690                 CERROR("%s: empty name for xattr set: rc = %d\n",
691                        mdt_obd_name(info->mti_mdt), -EPROTO);
692                 RETURN(err_serious(-EPROTO));
693         }
694
695         buf = (char *)update_param_buf(update, 1, &buf_len);
696         if (buf == NULL || buf_len == 0) {
697                 CERROR("%s: empty buf for xattr set: rc = %d\n",
698                        mdt_obd_name(info->mti_mdt), -EPROTO);
699                 RETURN(err_serious(-EPROTO));
700         }
701
702         lbuf->lb_buf = buf;
703         lbuf->lb_len = buf_len;
704
705         tmp = (char *)update_param_buf(update, 2, NULL);
706         if (tmp == NULL) {
707                 CERROR("%s: empty flag for xattr set: rc = %d\n",
708                        mdt_obd_name(info->mti_mdt), -EPROTO);
709                 RETURN(err_serious(-EPROTO));
710         }
711
712         flag = le32_to_cpu(*(int *)tmp);
713
714         rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
715                               &info->mti_handle,
716                               info->mti_u.update.mti_update_reply,
717                               info->mti_u.update.mti_update_reply_index);
718         RETURN(rc);
719 }
720
721 static int out_obj_ref_add(const struct lu_env *env,
722                            struct dt_object *dt_obj,
723                            struct thandle *th)
724 {
725         int rc;
726
727         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
728         rc = dt_ref_add(env, dt_obj, th);
729         dt_write_unlock(env, dt_obj);
730
731         return rc;
732 }
733
734 static int out_obj_ref_del(const struct lu_env *env,
735                            struct dt_object *dt_obj,
736                            struct thandle *th)
737 {
738         int rc;
739
740         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
741         rc = dt_ref_del(env, dt_obj, th);
742         dt_write_unlock(env, dt_obj);
743
744         return rc;
745 }
746
747 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
748                                struct tx_arg *arg)
749 {
750         struct dt_object *dt_obj = arg->object;
751         int rc;
752
753         rc = out_obj_ref_add(env, dt_obj, th);
754
755         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
756                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
757
758         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
759         return rc;
760 }
761
762 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
763                                struct tx_arg *arg)
764 {
765         return out_obj_ref_del(env, arg->object, th);
766 }
767
768 static int __out_tx_ref_add(const struct lu_env *env,
769                             struct dt_object *dt_obj,
770                             struct thandle_exec_args *th,
771                             struct update_reply *reply,
772                             int index, char *file, int line)
773 {
774         struct tx_arg           *arg;
775
776         LASSERT(th->ta_handle != NULL);
777         th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
778         if (th->ta_err != 0)
779                 return th->ta_err;
780
781         arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
782                           line);
783         LASSERT(arg);
784         lu_object_get(&dt_obj->do_lu);
785         arg->object = dt_obj;
786         arg->reply = reply;
787         arg->index = index;
788         return 0;
789 }
790
791 /**
792  * increase ref of the object
793  **/
794 static int out_ref_add(struct out_thread_info *info)
795 {
796         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
797         int               rc;
798
799         ENTRY;
800
801         rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
802                             info->mti_u.update.mti_update_reply,
803                             info->mti_u.update.mti_update_reply_index);
804         RETURN(rc);
805 }
806
807 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
808                                struct tx_arg *arg)
809 {
810         struct dt_object *dt_obj = arg->object;
811         int rc;
812
813         rc = out_obj_ref_del(env, dt_obj, th);
814
815         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
816                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
817
818         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
819
820         return rc;
821 }
822
823 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
824                                struct tx_arg *arg)
825 {
826         return out_obj_ref_add(env, arg->object, th);
827 }
828
829 static int __out_tx_ref_del(const struct lu_env *env,
830                             struct dt_object *dt_obj,
831                             struct thandle_exec_args *th,
832                             struct update_reply *reply,
833                             int index, char *file, int line)
834 {
835         struct tx_arg           *arg;
836
837         LASSERT(th->ta_handle != NULL);
838         th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
839         if (th->ta_err != 0)
840                 return th->ta_err;
841
842         arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
843                           line);
844         LASSERT(arg);
845         lu_object_get(&dt_obj->do_lu);
846         arg->object = dt_obj;
847         arg->reply = reply;
848         arg->index = index;
849         return 0;
850 }
851
852 static int out_ref_del(struct out_thread_info *info)
853 {
854         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
855         int               rc;
856
857         ENTRY;
858
859         if (!lu_object_exists(&obj->do_lu))
860                 RETURN(-ENOENT);
861
862         rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
863                             info->mti_u.update.mti_update_reply,
864                             info->mti_u.update.mti_update_reply_index);
865         RETURN(rc);
866 }
867
868 static int out_obj_index_insert(const struct lu_env *env,
869                                 struct dt_object *dt_obj,
870                                 const struct dt_rec *rec,
871                                 const struct dt_key *key,
872                                 struct thandle *th)
873 {
874         int rc;
875
876         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
877                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
878                (char *)key, PFID((struct lu_fid *)rec));
879
880         if (dt_try_as_dir(env, dt_obj) == 0)
881                 return -ENOTDIR;
882
883         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
884         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
885         dt_write_unlock(env, dt_obj);
886
887         return rc;
888 }
889
890 static int out_obj_index_delete(const struct lu_env *env,
891                                 struct dt_object *dt_obj,
892                                 const struct dt_key *key,
893                                 struct thandle *th)
894 {
895         int rc;
896
897         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
898                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
899                (char *)key);
900
901         if (dt_try_as_dir(env, dt_obj) == 0)
902                 return -ENOTDIR;
903
904         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
905         rc = dt_delete(env, dt_obj, key, th, NULL);
906         dt_write_unlock(env, dt_obj);
907
908         return rc;
909 }
910
911 static int out_tx_index_insert_exec(const struct lu_env *env,
912                                     struct thandle *th, struct tx_arg *arg)
913 {
914         struct dt_object *dt_obj = arg->object;
915         int rc;
916
917         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
918                                   arg->u.insert.key, th);
919
920         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
921                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
922
923         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
924
925         return rc;
926 }
927
928 static int out_tx_index_insert_undo(const struct lu_env *env,
929                                     struct thandle *th, struct tx_arg *arg)
930 {
931         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
932 }
933
934 static int __out_tx_index_insert(const struct lu_env *env,
935                                  struct dt_object *dt_obj,
936                                  char *name, struct lu_fid *fid,
937                                  struct thandle_exec_args *th,
938                                  struct update_reply *reply,
939                                  int index, char *file, int line)
940 {
941         struct tx_arg *arg;
942
943         LASSERT(th->ta_handle != NULL);
944
945         if (lu_object_exists(&dt_obj->do_lu)) {
946                 if (dt_try_as_dir(env, dt_obj) == 0) {
947                         th->ta_err = -ENOTDIR;
948                         return th->ta_err;
949                 }
950                 th->ta_err = dt_declare_insert(env, dt_obj,
951                                                (struct dt_rec *)fid,
952                                                (struct dt_key *)name,
953                                                th->ta_handle);
954         }
955
956         if (th->ta_err != 0)
957                 return th->ta_err;
958
959         arg = tx_add_exec(th, out_tx_index_insert_exec,
960                           out_tx_index_insert_undo, file,
961                           line);
962         LASSERT(arg);
963         lu_object_get(&dt_obj->do_lu);
964         arg->object = dt_obj;
965         arg->reply = reply;
966         arg->index = index;
967         arg->u.insert.rec = (struct dt_rec *)fid;
968         arg->u.insert.key = (struct dt_key *)name;
969
970         return 0;
971 }
972
973 static int out_index_insert(struct out_thread_info *info)
974 {
975         struct update     *update = info->mti_u.update.mti_update;
976         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
977         struct lu_fid     *fid;
978         char              *name;
979         int               rc = 0;
980         int               size;
981         ENTRY;
982
983         name = (char *)update_param_buf(update, 0, NULL);
984         if (name == NULL) {
985                 CERROR("%s: empty name for index insert: rc = %d\n",
986                        mdt_obd_name(info->mti_mdt), -EPROTO);
987                 RETURN(err_serious(-EPROTO));
988         }
989
990         fid = (struct lu_fid *)update_param_buf(update, 1, &size);
991         if (fid == NULL || size != sizeof(*fid)) {
992                 CERROR("%s: invalid fid: rc = %d\n",
993                        mdt_obd_name(info->mti_mdt), -EPROTO);
994                        RETURN(err_serious(-EPROTO));
995         }
996
997         fid_le_to_cpu(fid, fid);
998         if (!fid_is_sane(fid)) {
999                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1000                        mdt_obd_name(info->mti_mdt), PFID(fid),
1001                        -EPROTO);
1002                 RETURN(err_serious(-EPROTO));
1003         }
1004
1005         rc = out_tx_index_insert(info->mti_env, obj, name, fid,
1006                                  &info->mti_handle,
1007                                  info->mti_u.update.mti_update_reply,
1008                                  info->mti_u.update.mti_update_reply_index);
1009         RETURN(rc);
1010 }
1011
1012 static int out_tx_index_delete_exec(const struct lu_env *env,
1013                                     struct thandle *th,
1014                                     struct tx_arg *arg)
1015 {
1016         int rc;
1017
1018         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1019
1020         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1021                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1022
1023         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1024
1025         return rc;
1026 }
1027
1028 static int out_tx_index_delete_undo(const struct lu_env *env,
1029                                     struct thandle *th,
1030                                     struct tx_arg *arg)
1031 {
1032         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1033                dt_obd_name(th->th_dev), -ENOTSUPP);
1034         return -ENOTSUPP;
1035 }
1036
1037 static int __out_tx_index_delete(const struct lu_env *env,
1038                                  struct dt_object *dt_obj, char *name,
1039                                  struct thandle_exec_args *th,
1040                                  struct update_reply *reply,
1041                                  int index, char *file, int line)
1042 {
1043         struct tx_arg *arg;
1044
1045         if (dt_try_as_dir(env, dt_obj) == 0) {
1046                 th->ta_err = -ENOTDIR;
1047                 return th->ta_err;
1048         }
1049
1050         LASSERT(th->ta_handle != NULL);
1051         th->ta_err = dt_declare_delete(env, dt_obj,
1052                                        (struct dt_key *)name,
1053                                        th->ta_handle);
1054         if (th->ta_err != 0)
1055                 return th->ta_err;
1056
1057         arg = tx_add_exec(th, out_tx_index_delete_exec,
1058                           out_tx_index_delete_undo, file,
1059                           line);
1060         LASSERT(arg);
1061         lu_object_get(&dt_obj->do_lu);
1062         arg->object = dt_obj;
1063         arg->reply = reply;
1064         arg->index = index;
1065         arg->u.insert.key = (struct dt_key *)name;
1066         return 0;
1067 }
1068
1069 static int out_index_delete(struct out_thread_info *info)
1070 {
1071         struct update           *update = info->mti_u.update.mti_update;
1072         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
1073         char                    *name;
1074         int                     rc = 0;
1075
1076         if (!lu_object_exists(&obj->do_lu))
1077                 RETURN(-ENOENT);
1078         name = (char *)update_param_buf(update, 0, NULL);
1079         if (name == NULL) {
1080                 CERROR("%s: empty name for index delete: rc = %d\n",
1081                        mdt_obd_name(info->mti_mdt), -EPROTO);
1082                 RETURN(err_serious(-EPROTO));
1083         }
1084
1085         rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
1086                                  info->mti_u.update.mti_update_reply,
1087                                  info->mti_u.update.mti_update_reply_index);
1088         RETURN(rc);
1089 }
1090
1091 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1092                                struct tx_arg *arg)
1093 {
1094         struct dt_object *dt_obj = arg->object;
1095         int rc;
1096
1097         rc = out_obj_destroy(env, dt_obj, th);
1098
1099         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1100                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1101
1102         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1103
1104         RETURN(rc);
1105 }
1106
1107 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1108                                struct tx_arg *arg)
1109 {
1110         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1111                dt_obd_name(th->th_dev), -ENOTSUPP);
1112         return -ENOTSUPP;
1113 }
1114
1115 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1116                              struct thandle_exec_args *th,
1117                              struct update_reply *reply,
1118                              int index, char *file, int line)
1119 {
1120         struct tx_arg *arg;
1121
1122         LASSERT(th->ta_handle != NULL);
1123         th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
1124         if (th->ta_err)
1125                 return th->ta_err;
1126
1127         arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
1128                           file, line);
1129         LASSERT(arg);
1130         lu_object_get(&dt_obj->do_lu);
1131         arg->object = dt_obj;
1132         arg->reply = reply;
1133         arg->index = index;
1134         return 0;
1135 }
1136
1137 static int out_destroy(struct out_thread_info *info)
1138 {
1139         struct update           *update = info->mti_u.update.mti_update;
1140         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
1141         struct lu_fid           *fid;
1142         int                     rc;
1143         ENTRY;
1144
1145         fid = &update->u_fid;
1146         fid_le_to_cpu(fid, fid);
1147         if (!fid_is_sane(fid)) {
1148                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1149                        mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
1150                 RETURN(err_serious(-EPROTO));
1151         }
1152
1153         if (!lu_object_exists(&obj->do_lu))
1154                 RETURN(-ENOENT);
1155
1156         rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
1157                             info->mti_u.update.mti_update_reply,
1158                             info->mti_u.update.mti_update_reply_index);
1159
1160         RETURN(rc);
1161 }
1162
1163 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
1164 [opc - OBJ_CREATE] = {                                  \
1165         .mh_name    = name,                             \
1166         .mh_fail_id = fail_id,                          \
1167         .mh_opc     = opc,                              \
1168         .mh_flags   = flags,                            \
1169         .mh_act     = fn,                               \
1170         .mh_fmt     = NULL                              \
1171 }
1172
1173 #define out_handler mdt_handler
1174 static struct out_handler out_update_ops[] = {
1175         DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
1176                      out_create),
1177         DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
1178                      out_destroy),
1179         DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
1180                      out_ref_add),
1181         DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
1182                      out_ref_del),
1183         DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0,  MUTABOR | HABEO_REFERO,
1184                      out_attr_set),
1185         DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0,  HABEO_REFERO,
1186                      out_attr_get),
1187         DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
1188                      out_xattr_set),
1189         DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
1190                      out_xattr_get),
1191         DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
1192                      out_index_lookup),
1193         DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
1194                      MUTABOR | HABEO_REFERO, out_index_insert),
1195         DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
1196                      MUTABOR | HABEO_REFERO, out_index_delete),
1197 };
1198
1199 #define out_opc_slice mdt_opc_slice
1200 static struct out_opc_slice out_handlers[] = {
1201         {
1202                 .mos_opc_start = OBJ_CREATE,
1203                 .mos_opc_end   = OBJ_LAST,
1204                 .mos_hs = out_update_ops
1205         },
1206 };
1207
1208 /**
1209  * Object updates between Targets. Because all the updates has been
1210  * dis-assemblied into object updates in master MDD layer, so out
1211  * will skip MDD layer, and call OSD API directly to execute these
1212  * updates.
1213  *
1214  * In phase I, all of the updates in the request need to be executed
1215  * in one transaction, and the transaction has to be synchronously.
1216  *
1217  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1218  * format.
1219  */
1220 int out_handle(struct out_thread_info *info)
1221 {
1222         struct thandle_exec_args        *th = &info->mti_handle;
1223         struct req_capsule              *pill = info->mti_pill;
1224         struct mdt_device               *mdt = info->mti_mdt;
1225         struct dt_device                *dt = mdt->mdt_bottom;
1226         const struct lu_env             *env = info->mti_env;
1227         struct update_buf               *ubuf;
1228         struct update                   *update;
1229         struct update_reply             *update_reply;
1230         int                             bufsize;
1231         int                             count;
1232         int                             old_batchid = -1;
1233         unsigned                        off;
1234         int                             i;
1235         int                             rc = 0;
1236         int                             rc1 = 0;
1237         ENTRY;
1238
1239         req_capsule_set(pill, &RQF_UPDATE_OBJ);
1240         bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1241         if (bufsize != UPDATE_BUFFER_SIZE) {
1242                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1243                        mdt_obd_name(mdt), bufsize, -EPROTO);
1244                 RETURN(err_serious(-EPROTO));
1245         }
1246
1247         ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1248         if (ubuf == NULL) {
1249                 CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
1250                        -EPROTO);
1251                 RETURN(err_serious(-EPROTO));
1252         }
1253
1254         if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1255                 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1256                        mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
1257                        UPDATE_BUFFER_MAGIC, -EPROTO);
1258                 RETURN(err_serious(-EPROTO));
1259         }
1260
1261         count = le32_to_cpu(ubuf->ub_count);
1262         if (count <= 0) {
1263                 CERROR("%s: No update!: rc = %d\n",
1264                        mdt_obd_name(mdt), -EPROTO);
1265                 RETURN(err_serious(-EPROTO));
1266         }
1267
1268         req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1269                              UPDATE_BUFFER_SIZE);
1270         rc = req_capsule_server_pack(pill);
1271         if (rc != 0) {
1272                 CERROR("%s: Can't pack response: rc = %d\n",
1273                        mdt_obd_name(mdt), rc);
1274                 RETURN(rc);
1275         }
1276
1277         /* Prepare the update reply buffer */
1278         update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1279         update_init_reply_buf(update_reply, count);
1280         info->mti_u.update.mti_update_reply = update_reply;
1281
1282         rc = out_tx_start(env, dt, th);
1283         if (rc != 0)
1284                 RETURN(rc);
1285
1286         /* Walk through updates in the request to execute them synchronously */
1287         off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1288         for (i = 0; i < count; i++) {
1289                 struct out_handler *h;
1290                 struct dt_object   *dt_obj;
1291
1292                 update = (struct update *)((char *)ubuf + off);
1293                 if (old_batchid == -1) {
1294                         old_batchid = update->u_batchid;
1295                 } else if (old_batchid != update->u_batchid) {
1296                         /* Stop the current update transaction,
1297                          * create a new one */
1298                         rc = out_tx_end(env, th);
1299                         if (rc != 0)
1300                                 RETURN(rc);
1301
1302                         rc = out_tx_start(env, dt, th);
1303                         if (rc != 0)
1304                                 RETURN(rc);
1305                         old_batchid = update->u_batchid;
1306                 }
1307
1308                 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1309                 if (!fid_is_sane(&update->u_fid)) {
1310                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1311                                mdt_obd_name(mdt), PFID(&update->u_fid),
1312                                -EPROTO);
1313                         GOTO(out, rc = err_serious(-EPROTO));
1314                 }
1315
1316                 dt_obj = dt_locate(env, dt, &update->u_fid);
1317                 if (IS_ERR(dt_obj))
1318                         GOTO(out, rc = PTR_ERR(dt_obj));
1319
1320                 info->mti_u.update.mti_dt_object = dt_obj;
1321                 info->mti_u.update.mti_update = update;
1322                 info->mti_u.update.mti_update_reply_index = i;
1323
1324                 h = mdt_handler_find(update->u_type, out_handlers);
1325                 if (likely(h != NULL)) {
1326                         /* For real modification RPC, check if the update
1327                          * has been executed */
1328                         if (h->mh_flags & MUTABOR) {
1329                                 struct ptlrpc_request *req = mdt_info_req(info);
1330
1331                                 if (out_check_resent(env, dt, dt_obj, req,
1332                                                      out_reconstruct,
1333                                                      update_reply, i))
1334                                         GOTO(next, rc);
1335                         }
1336
1337                         rc = h->mh_act(info);
1338                 } else {
1339                         CERROR("%s: The unsupported opc: 0x%x\n",
1340                                mdt_obd_name(mdt), update->u_type);
1341                         lu_object_put(env, &dt_obj->do_lu);
1342                         GOTO(out, rc = -ENOTSUPP);
1343                 }
1344 next:
1345                 lu_object_put(env, &dt_obj->do_lu);
1346                 if (rc < 0)
1347                         GOTO(out, rc);
1348                 off += cfs_size_round(update_size(update));
1349         }
1350 out:
1351         rc1 = out_tx_end(env, th);
1352         rc = rc == 0 ? rc1 : rc;
1353         RETURN(rc);
1354 }