Whamcloud - gitweb
LU-1347 style: removes obsolete EXPORT_SYMTAB macros v2
[fs/lustre-release.git] / lustre / mdt / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35 #include <lustre_update.h>
36
37 static const char dot[] = ".";
38 static const char dotdot[] = "..";
39
40 /* Current out and mdt shared the same thread info, but in the future,
41  * this should be decoupled with MDT XXX*/
42 #define out_thread_info         mdt_thread_info
43 #define out_thread_key          mdt_thread_key
44
45 struct out_thread_info *out_env_info(const struct lu_env *env)
46 {
47         struct out_thread_info *info;
48
49         info = lu_context_key_get(&env->le_ctx, &out_thread_key);
50         LASSERT(info != NULL);
51         return info;
52 }
53
54 static inline char *dt_obd_name(struct dt_device *dt)
55 {
56         return dt->dd_lu_dev.ld_obd->obd_name;
57 }
58
59 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
60                            tx_exec_func_t undo, char *file, int line)
61 {
62         int i;
63
64         LASSERT(ta);
65         LASSERT(func);
66
67         i = ta->ta_argno;
68         LASSERT(i < UPDATE_MAX_OPS);
69
70         ta->ta_argno++;
71
72         ta->ta_args[i].exec_fn = func;
73         ta->ta_args[i].undo_fn = undo;
74         ta->ta_args[i].file    = file;
75         ta->ta_args[i].line    = line;
76
77         return &ta->ta_args[i];
78 }
79
80 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
81                         struct thandle_exec_args *th)
82 {
83         memset(th, 0, sizeof(*th));
84         th->ta_handle = dt_trans_create(env, dt);
85         if (IS_ERR(th->ta_handle)) {
86                 CERROR("%s: start handle error: rc = %ld\n",
87                        dt_obd_name(dt), PTR_ERR(th->ta_handle));
88                 return PTR_ERR(th->ta_handle);
89         }
90         th->ta_dev = dt;
91         /*For phase I, sync for cross-ref operation*/
92         th->ta_handle->th_sync = 1;
93         return 0;
94 }
95
96 static int out_trans_start(const struct lu_env *env,
97                            struct thandle_exec_args *th)
98 {
99         /* Always do sync commit for Phase I */
100         LASSERT(th->ta_handle->th_sync != 0);
101         return dt_trans_start(env, th->ta_dev, th->ta_handle);
102 }
103
104 static int out_trans_stop(const struct lu_env *env,
105                           struct thandle_exec_args *th, int err)
106 {
107         int i;
108         int rc;
109
110         th->ta_handle->th_result = err;
111         LASSERT(th->ta_handle->th_sync != 0);
112         rc = dt_trans_stop(env, th->ta_dev, th->ta_handle);
113         for (i = 0; i < th->ta_argno; i++) {
114                 if (th->ta_args[i].object != NULL) {
115                         lu_object_put(env, &th->ta_args[i].object->do_lu);
116                         th->ta_args[i].object = NULL;
117                 }
118         }
119
120         return rc;
121 }
122
123 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *th)
124 {
125         struct out_thread_info *info = out_env_info(env);
126         int i = 0, rc;
127
128         LASSERT(th->ta_dev);
129         LASSERT(th->ta_handle);
130
131         if (th->ta_err != 0 || th->ta_argno == 0)
132                 GOTO(stop, rc = th->ta_err);
133
134         rc = out_trans_start(env, th);
135         if (unlikely(rc))
136                 GOTO(stop, rc);
137
138         for (i = 0; i < th->ta_argno; i++) {
139                 rc = th->ta_args[i].exec_fn(env, th->ta_handle,
140                                             &th->ta_args[i]);
141                 if (unlikely(rc)) {
142                         CDEBUG(D_INFO, "error during execution of #%u from"
143                                " %s:%d: rc = %d\n", i, th->ta_args[i].file,
144                                th->ta_args[i].line, rc);
145                         while (--i >= 0) {
146                                 LASSERTF(th->ta_args[i].undo_fn != NULL,
147                                     "can't undo changes, hope for failover!\n");
148                                 th->ta_args[i].undo_fn(env, th->ta_handle,
149                                                        &th->ta_args[i]);
150                         }
151                         break;
152                 }
153         }
154
155         /* Only fail for real update */
156         info->mti_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
157 stop:
158         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
159                dt_obd_name(th->ta_dev), i, th->ta_argno, rc);
160         out_trans_stop(env, th, rc);
161         th->ta_handle = NULL;
162         th->ta_argno = 0;
163         th->ta_err = 0;
164
165         RETURN(rc);
166 }
167
168 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
169                             struct dt_object *obj, struct update_reply *reply,
170                             int index)
171 {
172         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
173                dt_obd_name(dt), reply, index, 0);
174
175         update_insert_reply(reply, NULL, 0, index, 0);
176         return;
177 }
178
179 typedef void (*out_reconstruct_t)(const struct lu_env *env,
180                                   struct dt_device *dt,
181                                   struct dt_object *obj,
182                                   struct update_reply *reply,
183                                   int index);
184
185 static inline int out_check_resent(const struct lu_env *env,
186                                    struct dt_device *dt,
187                                    struct dt_object *obj,
188                                    struct ptlrpc_request *req,
189                                    out_reconstruct_t reconstruct,
190                                    struct update_reply *reply,
191                                    int index)
192 {
193         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
194                 return 0;
195
196         if (req_xid_is_last(req)) {
197                 reconstruct(env, dt, obj, reply, index);
198                 return 1;
199         }
200         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
201                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
202         return 0;
203 }
204
205 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
206                            struct thandle *th)
207 {
208         int rc;
209
210         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
211                PFID(lu_object_fid(&dt_obj->do_lu)));
212
213         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
214         rc = dt_destroy(env, dt_obj, th);
215         dt_write_unlock(env, dt_obj);
216
217         return rc;
218 }
219
220 /**
221  * All of the xxx_undo will be used once execution failed,
222  * But because all of the required resource has been reserved in
223  * declare phase, i.e. if declare succeed, it should make sure
224  * the following executing phase succeed in anyway, so these undo
225  * should be useless for most of the time in Phase I
226  */
227 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
228                        struct tx_arg *arg)
229 {
230         int rc;
231
232         rc = out_obj_destroy(env, arg->object, th);
233         if (rc != 0)
234                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
235                        dt_obd_name(th->th_dev), rc);
236         return rc;
237 }
238
239 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
240                        struct tx_arg *arg)
241 {
242         struct dt_object *dt_obj = arg->object;
243         int rc;
244
245         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
246                dt_obd_name(th->th_dev),
247                PFID(lu_object_fid(&arg->object->do_lu)),
248                arg->u.create.dof.dof_type,
249                arg->u.create.attr.la_mode & S_IFMT);
250
251         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
252         rc = dt_create(env, dt_obj, &arg->u.create.attr,
253                        &arg->u.create.hint, &arg->u.create.dof, th);
254
255         dt_write_unlock(env, dt_obj);
256
257         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
258                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
259
260         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
261
262         return rc;
263 }
264
265 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
266                            struct lu_attr *attr, struct lu_fid *parent_fid,
267                            struct dt_object_format *dof,
268                            struct thandle_exec_args *th,
269                            struct update_reply *reply,
270                            int index, char *file, int line)
271 {
272         struct tx_arg *arg;
273
274         LASSERT(th->ta_handle != NULL);
275         th->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
276                                        th->ta_handle);
277         if (th->ta_err != 0)
278                 return th->ta_err;
279
280         arg = tx_add_exec(th, out_tx_create_exec, out_tx_create_undo, file,
281                           line);
282         LASSERT(arg);
283
284         /* release the object in out_trans_stop */
285         lu_object_get(&obj->do_lu);
286         arg->object = obj;
287         arg->u.create.attr = *attr;
288         if (parent_fid)
289                 arg->u.create.fid = *parent_fid;
290         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
291         arg->u.create.dof  = *dof;
292         arg->reply = reply;
293         arg->index = index;
294
295         return 0;
296 }
297
298 static int out_create(struct out_thread_info *info)
299 {
300         struct update           *update = info->mti_u.update.mti_update;
301         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
302         struct dt_object_format *dof = &info->mti_u.update.mti_update_dof;
303         struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
304         struct lu_attr          *attr = &info->mti_attr.ma_attr;
305         struct lu_fid           *fid = NULL;
306         struct obdo             *wobdo;
307         int                     size;
308         int                     rc;
309
310         ENTRY;
311
312         wobdo = update_param_buf(update, 0, &size);
313         if (wobdo == NULL || size != sizeof(*wobdo)) {
314                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
315                        mdt_obd_name(info->mti_mdt), -EPROTO);
316                 RETURN(err_serious(-EPROTO));
317         }
318
319         obdo_le_to_cpu(wobdo, wobdo);
320         lustre_get_wire_obdo(NULL, lobdo, wobdo);
321         la_from_obdo(attr, lobdo, lobdo->o_valid);
322
323         dof->dof_type = dt_mode_to_dft(attr->la_mode);
324         if (S_ISDIR(attr->la_mode)) {
325                 int size;
326
327                 fid = update_param_buf(update, 1, &size);
328                 if (fid == NULL || size != sizeof(*fid)) {
329                         CERROR("%s: invalid fid: rc = %d\n",
330                                mdt_obd_name(info->mti_mdt), -EPROTO);
331                         RETURN(err_serious(-EPROTO));
332                 }
333                 fid_le_to_cpu(fid, fid);
334                 if (!fid_is_sane(fid)) {
335                         CERROR("%s: invalid fid "DFID": rc = %d\n",
336                                mdt_obd_name(info->mti_mdt),
337                                PFID(fid), -EPROTO);
338                         RETURN(err_serious(-EPROTO));
339                 }
340         }
341
342         if (lu_object_exists(&obj->do_lu))
343                 RETURN(-EEXIST);
344
345         rc = out_tx_create(info->mti_env, obj, attr, fid, dof,
346                            &info->mti_handle,
347                            info->mti_u.update.mti_update_reply,
348                            info->mti_u.update.mti_update_reply_index);
349
350         RETURN(rc);
351 }
352
353 static int out_tx_attr_set_undo(const struct lu_env *env,
354                                 struct thandle *th, struct tx_arg *arg)
355 {
356         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
357                dt_obd_name(th->th_dev),
358                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
359
360         return -ENOTSUPP;
361 }
362
363 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
364                                 struct tx_arg *arg)
365 {
366         struct dt_object        *dt_obj = arg->object;
367         int                     rc;
368
369         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
370                PFID(lu_object_fid(&dt_obj->do_lu)));
371
372         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
373         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr,
374                          th, NULL);
375         dt_write_unlock(env, dt_obj);
376
377         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
378                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
379
380         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
381
382         return rc;
383 }
384
385 static int __out_tx_attr_set(const struct lu_env *env,
386                              struct dt_object *dt_obj,
387                              const struct lu_attr *attr,
388                              struct thandle_exec_args *th,
389                              struct update_reply *reply, int index,
390                              char *file, int line)
391 {
392         struct tx_arg           *arg;
393
394         LASSERT(th->ta_handle != NULL);
395         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
396         if (th->ta_err != 0)
397                 return th->ta_err;
398
399         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
400                           file, line);
401         LASSERT(arg);
402         lu_object_get(&dt_obj->do_lu);
403         arg->object = dt_obj;
404         arg->u.attr_set.attr = *attr;
405         arg->reply = reply;
406         arg->index = index;
407         return 0;
408 }
409
410 static int out_attr_set(struct out_thread_info *info)
411 {
412         struct update           *update = info->mti_u.update.mti_update;
413         struct lu_attr          *attr = &info->mti_attr.ma_attr;
414         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
415         struct obdo             *lobdo = &info->mti_u.update.mti_obdo;
416         struct obdo             *wobdo;
417         int                     size;
418         int                     rc;
419
420         ENTRY;
421
422         wobdo = update_param_buf(update, 0, &size);
423         if (wobdo == NULL || size != sizeof(*wobdo)) {
424                 CERROR("%s: empty obdo in the update: rc = %d\n",
425                        mdt_obd_name(info->mti_mdt), -EPROTO);
426                 RETURN(err_serious(-EPROTO));
427         }
428
429         attr->la_valid = 0;
430         attr->la_valid = 0;
431         obdo_le_to_cpu(wobdo, wobdo);
432         lustre_get_wire_obdo(NULL, lobdo, wobdo);
433         la_from_obdo(attr, lobdo, lobdo->o_valid);
434
435         rc = out_tx_attr_set(info->mti_env, obj, attr, &info->mti_handle,
436                              info->mti_u.update.mti_update_reply,
437                              info->mti_u.update.mti_update_reply_index);
438
439         RETURN(rc);
440 }
441
442 static int out_attr_get(struct out_thread_info *info)
443 {
444         struct obdo             *obdo = &info->mti_u.update.mti_obdo;
445         const struct lu_env     *env = info->mti_env;
446         struct lu_attr          *la = &info->mti_attr.ma_attr;
447         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
448         int                     rc;
449
450         ENTRY;
451
452         if (!lu_object_exists(&obj->do_lu))
453                 RETURN(-ENOENT);
454
455         dt_read_lock(env, obj, MOR_TGT_CHILD);
456         rc = dt_attr_get(env, obj, la, NULL);
457         if (rc)
458                 GOTO(out_unlock, rc);
459         /*
460          * If it is a directory, we will also check whether the
461          * directory is empty.
462          * la_flags = 0 : Empty.
463          *          = 1 : Not empty.
464          */
465         la->la_flags = 0;
466         if (S_ISDIR(la->la_mode)) {
467                 struct dt_it     *it;
468                 const struct dt_it_ops *iops;
469
470                 if (!dt_try_as_dir(env, obj))
471                         GOTO(out_unlock, rc = -ENOTDIR);
472
473                 iops = &obj->do_index_ops->dio_it;
474                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
475                 if (!IS_ERR(it)) {
476                         int  result;
477                         result = iops->get(env, it, (const void *)"");
478                         if (result > 0) {
479                                 int i;
480                                 for (result = 0, i = 0; result == 0 && i < 3;
481                                      ++i)
482                                         result = iops->next(env, it);
483                                 if (result == 0)
484                                         la->la_flags = 1;
485                         } else if (result == 0)
486                                 /*
487                                  * Huh? Index contains no zero key?
488                                  */
489                                 rc = -EIO;
490
491                         iops->put(env, it);
492                         iops->fini(env, it);
493                 }
494         }
495
496         obdo->o_valid = 0;
497         obdo_from_la(obdo, la, la->la_valid);
498         obdo_cpu_to_le(obdo, obdo);
499         lustre_set_wire_obdo(NULL, obdo, obdo);
500
501 out_unlock:
502         dt_read_unlock(env, obj);
503
504         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
505                mdt_obd_name(info->mti_mdt),
506                info->mti_u.update.mti_update_reply, 0, rc);
507
508         update_insert_reply(info->mti_u.update.mti_update_reply, obdo,
509                             sizeof(*obdo), 0, rc);
510         RETURN(rc);
511 }
512
513 static int out_xattr_get(struct out_thread_info *info)
514 {
515         struct update           *update = info->mti_u.update.mti_update;
516         const struct lu_env     *env = info->mti_env;
517         struct lu_buf           *lbuf = &info->mti_buf;
518         struct update_reply     *reply = info->mti_u.update.mti_update_reply;
519         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
520         char                    *name;
521         void                    *ptr;
522         int                     rc;
523
524         ENTRY;
525
526         name = (char *)update_param_buf(update, 0, NULL);
527         if (name == NULL) {
528                 CERROR("%s: empty name for xattr get: rc = %d\n",
529                        mdt_obd_name(info->mti_mdt), -EPROTO);
530                 RETURN(err_serious(-EPROTO));
531         }
532
533         ptr = update_get_buf_internal(reply, 0, NULL);
534         LASSERT(ptr != NULL);
535
536         /* The first 4 bytes(int) are used to store the result */
537         lbuf->lb_buf = (char *)ptr + sizeof(int);
538         lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
539         dt_read_lock(env, obj, MOR_TGT_CHILD);
540         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
541         dt_read_unlock(env, obj);
542         if (rc < 0) {
543                 lbuf->lb_len = 0;
544                 GOTO(out, rc);
545         }
546         if (rc == 0) {
547                 lbuf->lb_len = 0;
548                 GOTO(out, rc = -ENOENT);
549         }
550         lbuf->lb_len = rc;
551         rc = 0;
552         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
553                mdt_obd_name(info->mti_mdt), PFID(lu_object_fid(&obj->do_lu)),
554                name, (int)lbuf->lb_len);
555 out:
556         *(int *)ptr = rc;
557         reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
558         RETURN(rc);
559 }
560
561 static int out_index_lookup(struct out_thread_info *info)
562 {
563         struct update           *update = info->mti_u.update.mti_update;
564         const struct lu_env     *env = info->mti_env;
565         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
566         char                    *name;
567         int                     rc;
568
569         ENTRY;
570
571         if (!lu_object_exists(&obj->do_lu))
572                 RETURN(-ENOENT);
573
574         name = (char *)update_param_buf(update, 0, NULL);
575         if (name == NULL) {
576                 CERROR("%s: empty name for lookup: rc = %d\n",
577                        mdt_obd_name(info->mti_mdt), -EPROTO);
578                 RETURN(err_serious(-EPROTO));
579         }
580
581         dt_read_lock(env, obj, MOR_TGT_CHILD);
582         if (!dt_try_as_dir(env, obj))
583                 GOTO(out_unlock, rc = -ENOTDIR);
584
585         rc = dt_lookup(env, obj, (struct dt_rec *)&info->mti_tmp_fid1,
586                 (struct dt_key *)name, NULL);
587
588         if (rc < 0)
589                 GOTO(out_unlock, rc);
590
591         if (rc == 0)
592                 rc += 1;
593
594         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
595                PFID(lu_object_fid(&obj->do_lu)), name,
596                PFID(&info->mti_tmp_fid1), rc);
597         fid_cpu_to_le(&info->mti_tmp_fid1, &info->mti_tmp_fid1);
598
599 out_unlock:
600         dt_read_unlock(env, obj);
601
602         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
603                mdt_obd_name(info->mti_mdt),
604                info->mti_u.update.mti_update_reply, 0, rc);
605
606         update_insert_reply(info->mti_u.update.mti_update_reply,
607                             &info->mti_tmp_fid1, sizeof(info->mti_tmp_fid1),
608                             0, rc);
609         RETURN(rc);
610 }
611
612 static int out_tx_xattr_set_exec(const struct lu_env *env,
613                                  struct thandle *th,
614                                  struct tx_arg *arg)
615 {
616         struct dt_object *dt_obj = arg->object;
617         int rc;
618
619         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
620                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
621                arg->u.xattr_set.name, arg->u.xattr_set.flags);
622
623         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
624         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
625                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
626                           th, NULL);
627         dt_write_unlock(env, dt_obj);
628         /**
629          * Ignore errors if this is LINK EA
630          **/
631         if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
632                                     strlen(XATTR_NAME_LINK))))
633                 rc = 0;
634
635         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
636                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
637
638         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
639
640         return rc;
641 }
642
643 static int __out_tx_xattr_set(const struct lu_env *env,
644                               struct dt_object *dt_obj,
645                               const struct lu_buf *buf,
646                               const char *name, int flags,
647                               struct thandle_exec_args *th,
648                               struct update_reply *reply, int index,
649                               char *file, int line)
650 {
651         struct tx_arg           *arg;
652
653         LASSERT(th->ta_handle != NULL);
654         th->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
655                                           flags, th->ta_handle);
656         if (th->ta_err != 0)
657                 return th->ta_err;
658
659         arg = tx_add_exec(th, out_tx_xattr_set_exec, NULL, file, line);
660         LASSERT(arg);
661         lu_object_get(&dt_obj->do_lu);
662         arg->object = dt_obj;
663         arg->u.xattr_set.name = name;
664         arg->u.xattr_set.flags = flags;
665         arg->u.xattr_set.buf = *buf;
666         arg->reply = reply;
667         arg->index = index;
668         arg->u.xattr_set.csum = 0;
669         return 0;
670 }
671
672 static int out_xattr_set(struct out_thread_info *info)
673 {
674         struct update           *update = info->mti_u.update.mti_update;
675         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
676         struct lu_buf           *lbuf = &info->mti_buf;
677         char                    *name;
678         char                    *buf;
679         char                    *tmp;
680         int                     buf_len = 0;
681         int                     flag;
682         int                     rc;
683         ENTRY;
684
685         name = update_param_buf(update, 0, NULL);
686         if (name == NULL) {
687                 CERROR("%s: empty name for xattr set: rc = %d\n",
688                        mdt_obd_name(info->mti_mdt), -EPROTO);
689                 RETURN(err_serious(-EPROTO));
690         }
691
692         buf = (char *)update_param_buf(update, 1, &buf_len);
693         if (buf == NULL || buf_len == 0) {
694                 CERROR("%s: empty buf for xattr set: rc = %d\n",
695                        mdt_obd_name(info->mti_mdt), -EPROTO);
696                 RETURN(err_serious(-EPROTO));
697         }
698
699         lbuf->lb_buf = buf;
700         lbuf->lb_len = buf_len;
701
702         tmp = (char *)update_param_buf(update, 2, NULL);
703         if (tmp == NULL) {
704                 CERROR("%s: empty flag for xattr set: rc = %d\n",
705                        mdt_obd_name(info->mti_mdt), -EPROTO);
706                 RETURN(err_serious(-EPROTO));
707         }
708
709         flag = le32_to_cpu(*(int *)tmp);
710
711         rc = out_tx_xattr_set(info->mti_env, obj, lbuf, name, flag,
712                               &info->mti_handle,
713                               info->mti_u.update.mti_update_reply,
714                               info->mti_u.update.mti_update_reply_index);
715         RETURN(rc);
716 }
717
718 static int out_obj_ref_add(const struct lu_env *env,
719                            struct dt_object *dt_obj,
720                            struct thandle *th)
721 {
722         int rc;
723
724         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
725         rc = dt_ref_add(env, dt_obj, th);
726         dt_write_unlock(env, dt_obj);
727
728         return rc;
729 }
730
731 static int out_obj_ref_del(const struct lu_env *env,
732                            struct dt_object *dt_obj,
733                            struct thandle *th)
734 {
735         int rc;
736
737         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
738         rc = dt_ref_del(env, dt_obj, th);
739         dt_write_unlock(env, dt_obj);
740
741         return rc;
742 }
743
744 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
745                                struct tx_arg *arg)
746 {
747         struct dt_object *dt_obj = arg->object;
748         int rc;
749
750         rc = out_obj_ref_add(env, dt_obj, th);
751
752         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
753                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
754
755         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
756         return rc;
757 }
758
759 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
760                                struct tx_arg *arg)
761 {
762         return out_obj_ref_del(env, arg->object, th);
763 }
764
765 static int __out_tx_ref_add(const struct lu_env *env,
766                             struct dt_object *dt_obj,
767                             struct thandle_exec_args *th,
768                             struct update_reply *reply,
769                             int index, char *file, int line)
770 {
771         struct tx_arg           *arg;
772
773         LASSERT(th->ta_handle != NULL);
774         th->ta_err = dt_declare_ref_add(env, dt_obj, th->ta_handle);
775         if (th->ta_err != 0)
776                 return th->ta_err;
777
778         arg = tx_add_exec(th, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
779                           line);
780         LASSERT(arg);
781         lu_object_get(&dt_obj->do_lu);
782         arg->object = dt_obj;
783         arg->reply = reply;
784         arg->index = index;
785         return 0;
786 }
787
788 /**
789  * increase ref of the object
790  **/
791 static int out_ref_add(struct out_thread_info *info)
792 {
793         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
794         int               rc;
795
796         ENTRY;
797
798         rc = out_tx_ref_add(info->mti_env, obj, &info->mti_handle,
799                             info->mti_u.update.mti_update_reply,
800                             info->mti_u.update.mti_update_reply_index);
801         RETURN(rc);
802 }
803
804 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
805                                struct tx_arg *arg)
806 {
807         struct dt_object *dt_obj = arg->object;
808         int rc;
809
810         rc = out_obj_ref_del(env, dt_obj, th);
811
812         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
813                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
814
815         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
816
817         return rc;
818 }
819
820 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
821                                struct tx_arg *arg)
822 {
823         return out_obj_ref_add(env, arg->object, th);
824 }
825
826 static int __out_tx_ref_del(const struct lu_env *env,
827                             struct dt_object *dt_obj,
828                             struct thandle_exec_args *th,
829                             struct update_reply *reply,
830                             int index, char *file, int line)
831 {
832         struct tx_arg           *arg;
833
834         LASSERT(th->ta_handle != NULL);
835         th->ta_err = dt_declare_ref_del(env, dt_obj, th->ta_handle);
836         if (th->ta_err != 0)
837                 return th->ta_err;
838
839         arg = tx_add_exec(th, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
840                           line);
841         LASSERT(arg);
842         lu_object_get(&dt_obj->do_lu);
843         arg->object = dt_obj;
844         arg->reply = reply;
845         arg->index = index;
846         return 0;
847 }
848
849 static int out_ref_del(struct out_thread_info *info)
850 {
851         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
852         int               rc;
853
854         ENTRY;
855
856         if (!lu_object_exists(&obj->do_lu))
857                 RETURN(-ENOENT);
858
859         rc = out_tx_ref_del(info->mti_env, obj, &info->mti_handle,
860                             info->mti_u.update.mti_update_reply,
861                             info->mti_u.update.mti_update_reply_index);
862         RETURN(rc);
863 }
864
865 static int out_obj_index_insert(const struct lu_env *env,
866                                 struct dt_object *dt_obj,
867                                 const struct dt_rec *rec,
868                                 const struct dt_key *key,
869                                 struct thandle *th)
870 {
871         int rc;
872
873         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
874                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
875                (char *)key, PFID((struct lu_fid *)rec));
876
877         if (dt_try_as_dir(env, dt_obj) == 0)
878                 return -ENOTDIR;
879
880         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
881         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
882         dt_write_unlock(env, dt_obj);
883
884         return rc;
885 }
886
887 static int out_obj_index_delete(const struct lu_env *env,
888                                 struct dt_object *dt_obj,
889                                 const struct dt_key *key,
890                                 struct thandle *th)
891 {
892         int rc;
893
894         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
895                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
896                (char *)key);
897
898         if (dt_try_as_dir(env, dt_obj) == 0)
899                 return -ENOTDIR;
900
901         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
902         rc = dt_delete(env, dt_obj, key, th, NULL);
903         dt_write_unlock(env, dt_obj);
904
905         return rc;
906 }
907
908 static int out_tx_index_insert_exec(const struct lu_env *env,
909                                     struct thandle *th, struct tx_arg *arg)
910 {
911         struct dt_object *dt_obj = arg->object;
912         int rc;
913
914         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
915                                   arg->u.insert.key, th);
916
917         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
918                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
919
920         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
921
922         return rc;
923 }
924
925 static int out_tx_index_insert_undo(const struct lu_env *env,
926                                     struct thandle *th, struct tx_arg *arg)
927 {
928         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
929 }
930
931 static int __out_tx_index_insert(const struct lu_env *env,
932                                  struct dt_object *dt_obj,
933                                  char *name, struct lu_fid *fid,
934                                  struct thandle_exec_args *th,
935                                  struct update_reply *reply,
936                                  int index, char *file, int line)
937 {
938         struct tx_arg *arg;
939
940         LASSERT(th->ta_handle != NULL);
941
942         if (lu_object_exists(&dt_obj->do_lu)) {
943                 if (dt_try_as_dir(env, dt_obj) == 0) {
944                         th->ta_err = -ENOTDIR;
945                         return th->ta_err;
946                 }
947                 th->ta_err = dt_declare_insert(env, dt_obj,
948                                                (struct dt_rec *)fid,
949                                                (struct dt_key *)name,
950                                                th->ta_handle);
951         }
952
953         if (th->ta_err != 0)
954                 return th->ta_err;
955
956         arg = tx_add_exec(th, out_tx_index_insert_exec,
957                           out_tx_index_insert_undo, file,
958                           line);
959         LASSERT(arg);
960         lu_object_get(&dt_obj->do_lu);
961         arg->object = dt_obj;
962         arg->reply = reply;
963         arg->index = index;
964         arg->u.insert.rec = (struct dt_rec *)fid;
965         arg->u.insert.key = (struct dt_key *)name;
966
967         return 0;
968 }
969
970 static int out_index_insert(struct out_thread_info *info)
971 {
972         struct update     *update = info->mti_u.update.mti_update;
973         struct dt_object  *obj = info->mti_u.update.mti_dt_object;
974         struct lu_fid     *fid;
975         char              *name;
976         int               rc = 0;
977         int               size;
978         ENTRY;
979
980         name = (char *)update_param_buf(update, 0, NULL);
981         if (name == NULL) {
982                 CERROR("%s: empty name for index insert: rc = %d\n",
983                        mdt_obd_name(info->mti_mdt), -EPROTO);
984                 RETURN(err_serious(-EPROTO));
985         }
986
987         fid = (struct lu_fid *)update_param_buf(update, 1, &size);
988         if (fid == NULL || size != sizeof(*fid)) {
989                 CERROR("%s: invalid fid: rc = %d\n",
990                        mdt_obd_name(info->mti_mdt), -EPROTO);
991                        RETURN(err_serious(-EPROTO));
992         }
993
994         fid_le_to_cpu(fid, fid);
995         if (!fid_is_sane(fid)) {
996                 CERROR("%s: invalid FID "DFID": rc = %d\n",
997                        mdt_obd_name(info->mti_mdt), PFID(fid),
998                        -EPROTO);
999                 RETURN(err_serious(-EPROTO));
1000         }
1001
1002         rc = out_tx_index_insert(info->mti_env, obj, name, fid,
1003                                  &info->mti_handle,
1004                                  info->mti_u.update.mti_update_reply,
1005                                  info->mti_u.update.mti_update_reply_index);
1006         RETURN(rc);
1007 }
1008
1009 static int out_tx_index_delete_exec(const struct lu_env *env,
1010                                     struct thandle *th,
1011                                     struct tx_arg *arg)
1012 {
1013         int rc;
1014
1015         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1016
1017         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1018                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1019
1020         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1021
1022         return rc;
1023 }
1024
1025 static int out_tx_index_delete_undo(const struct lu_env *env,
1026                                     struct thandle *th,
1027                                     struct tx_arg *arg)
1028 {
1029         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1030                dt_obd_name(th->th_dev), -ENOTSUPP);
1031         return -ENOTSUPP;
1032 }
1033
1034 static int __out_tx_index_delete(const struct lu_env *env,
1035                                  struct dt_object *dt_obj, char *name,
1036                                  struct thandle_exec_args *th,
1037                                  struct update_reply *reply,
1038                                  int index, char *file, int line)
1039 {
1040         struct tx_arg *arg;
1041
1042         if (dt_try_as_dir(env, dt_obj) == 0) {
1043                 th->ta_err = -ENOTDIR;
1044                 return th->ta_err;
1045         }
1046
1047         LASSERT(th->ta_handle != NULL);
1048         th->ta_err = dt_declare_delete(env, dt_obj,
1049                                        (struct dt_key *)name,
1050                                        th->ta_handle);
1051         if (th->ta_err != 0)
1052                 return th->ta_err;
1053
1054         arg = tx_add_exec(th, out_tx_index_delete_exec,
1055                           out_tx_index_delete_undo, file,
1056                           line);
1057         LASSERT(arg);
1058         lu_object_get(&dt_obj->do_lu);
1059         arg->object = dt_obj;
1060         arg->reply = reply;
1061         arg->index = index;
1062         arg->u.insert.key = (struct dt_key *)name;
1063         return 0;
1064 }
1065
1066 static int out_index_delete(struct out_thread_info *info)
1067 {
1068         struct update           *update = info->mti_u.update.mti_update;
1069         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
1070         char                    *name;
1071         int                     rc = 0;
1072
1073         if (!lu_object_exists(&obj->do_lu))
1074                 RETURN(-ENOENT);
1075         name = (char *)update_param_buf(update, 0, NULL);
1076         if (name == NULL) {
1077                 CERROR("%s: empty name for index delete: rc = %d\n",
1078                        mdt_obd_name(info->mti_mdt), -EPROTO);
1079                 RETURN(err_serious(-EPROTO));
1080         }
1081
1082         rc = out_tx_index_delete(info->mti_env, obj, name, &info->mti_handle,
1083                                  info->mti_u.update.mti_update_reply,
1084                                  info->mti_u.update.mti_update_reply_index);
1085         RETURN(rc);
1086 }
1087
1088 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1089                                struct tx_arg *arg)
1090 {
1091         struct dt_object *dt_obj = arg->object;
1092         int rc;
1093
1094         rc = out_obj_destroy(env, dt_obj, th);
1095
1096         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1097                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1098
1099         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1100
1101         RETURN(rc);
1102 }
1103
1104 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1105                                struct tx_arg *arg)
1106 {
1107         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1108                dt_obd_name(th->th_dev), -ENOTSUPP);
1109         return -ENOTSUPP;
1110 }
1111
1112 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1113                              struct thandle_exec_args *th,
1114                              struct update_reply *reply,
1115                              int index, char *file, int line)
1116 {
1117         struct tx_arg *arg;
1118
1119         LASSERT(th->ta_handle != NULL);
1120         th->ta_err = dt_declare_destroy(env, dt_obj, th->ta_handle);
1121         if (th->ta_err)
1122                 return th->ta_err;
1123
1124         arg = tx_add_exec(th, out_tx_destroy_exec, out_tx_destroy_undo,
1125                           file, line);
1126         LASSERT(arg);
1127         lu_object_get(&dt_obj->do_lu);
1128         arg->object = dt_obj;
1129         arg->reply = reply;
1130         arg->index = index;
1131         return 0;
1132 }
1133
1134 static int out_destroy(struct out_thread_info *info)
1135 {
1136         struct update           *update = info->mti_u.update.mti_update;
1137         struct dt_object        *obj = info->mti_u.update.mti_dt_object;
1138         struct lu_fid           *fid;
1139         int                     rc;
1140         ENTRY;
1141
1142         fid = &update->u_fid;
1143         fid_le_to_cpu(fid, fid);
1144         if (!fid_is_sane(fid)) {
1145                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1146                        mdt_obd_name(info->mti_mdt), PFID(fid), -EPROTO);
1147                 RETURN(err_serious(-EPROTO));
1148         }
1149
1150         if (!lu_object_exists(&obj->do_lu))
1151                 RETURN(-ENOENT);
1152
1153         rc = out_tx_destroy(info->mti_env, obj, &info->mti_handle,
1154                             info->mti_u.update.mti_update_reply,
1155                             info->mti_u.update.mti_update_reply_index);
1156
1157         RETURN(rc);
1158 }
1159
1160 #define DEF_OUT_HNDL(opc, name, fail_id, flags, fn)     \
1161 [opc - OBJ_CREATE] = {                                  \
1162         .mh_name    = name,                             \
1163         .mh_fail_id = fail_id,                          \
1164         .mh_opc     = opc,                              \
1165         .mh_flags   = flags,                            \
1166         .mh_act     = fn,                               \
1167         .mh_fmt     = NULL                              \
1168 }
1169
1170 #define out_handler mdt_handler
1171 static struct out_handler out_update_ops[] = {
1172         DEF_OUT_HNDL(OBJ_CREATE, "obj_create", 0, MUTABOR | HABEO_REFERO,
1173                      out_create),
1174         DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", 0, MUTABOR | HABEO_REFERO,
1175                      out_destroy),
1176         DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", 0, MUTABOR | HABEO_REFERO,
1177                      out_ref_add),
1178         DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", 0, MUTABOR | HABEO_REFERO,
1179                      out_ref_del),
1180         DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set", 0,  MUTABOR | HABEO_REFERO,
1181                      out_attr_set),
1182         DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get", 0,  HABEO_REFERO,
1183                      out_attr_get),
1184         DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", 0, MUTABOR | HABEO_REFERO,
1185                      out_xattr_set),
1186         DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", 0, HABEO_REFERO,
1187                      out_xattr_get),
1188         DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", 0, HABEO_REFERO,
1189                      out_index_lookup),
1190         DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert", 0,
1191                      MUTABOR | HABEO_REFERO, out_index_insert),
1192         DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete", 0,
1193                      MUTABOR | HABEO_REFERO, out_index_delete),
1194 };
1195
1196 #define out_opc_slice mdt_opc_slice
1197 static struct out_opc_slice out_handlers[] = {
1198         {
1199                 .mos_opc_start = OBJ_CREATE,
1200                 .mos_opc_end   = OBJ_LAST,
1201                 .mos_hs = out_update_ops
1202         },
1203 };
1204
1205 /**
1206  * Object updates between Targets. Because all the updates has been
1207  * dis-assemblied into object updates in master MDD layer, so out
1208  * will skip MDD layer, and call OSD API directly to execute these
1209  * updates.
1210  *
1211  * In phase I, all of the updates in the request need to be executed
1212  * in one transaction, and the transaction has to be synchronously.
1213  *
1214  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1215  * format.
1216  */
1217 int out_handle(struct out_thread_info *info)
1218 {
1219         struct thandle_exec_args        *th = &info->mti_handle;
1220         struct req_capsule              *pill = info->mti_pill;
1221         struct mdt_device               *mdt = info->mti_mdt;
1222         struct dt_device                *dt = mdt->mdt_bottom;
1223         const struct lu_env             *env = info->mti_env;
1224         struct update_buf               *ubuf;
1225         struct update                   *update;
1226         struct update_reply             *update_reply;
1227         int                             bufsize;
1228         int                             count;
1229         int                             old_batchid = -1;
1230         unsigned                        off;
1231         int                             i;
1232         int                             rc = 0;
1233         int                             rc1 = 0;
1234         ENTRY;
1235
1236         req_capsule_set(pill, &RQF_UPDATE_OBJ);
1237         bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1238         if (bufsize != UPDATE_BUFFER_SIZE) {
1239                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1240                        mdt_obd_name(mdt), bufsize, -EPROTO);
1241                 RETURN(err_serious(-EPROTO));
1242         }
1243
1244         ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1245         if (ubuf == NULL) {
1246                 CERROR("%s: No buf!: rc = %d\n", mdt_obd_name(mdt),
1247                        -EPROTO);
1248                 RETURN(err_serious(-EPROTO));
1249         }
1250
1251         if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1252                 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1253                        mdt_obd_name(mdt), le32_to_cpu(ubuf->ub_magic),
1254                        UPDATE_BUFFER_MAGIC, -EPROTO);
1255                 RETURN(err_serious(-EPROTO));
1256         }
1257
1258         count = le32_to_cpu(ubuf->ub_count);
1259         if (count <= 0) {
1260                 CERROR("%s: No update!: rc = %d\n",
1261                        mdt_obd_name(mdt), -EPROTO);
1262                 RETURN(err_serious(-EPROTO));
1263         }
1264
1265         req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1266                              UPDATE_BUFFER_SIZE);
1267         rc = req_capsule_server_pack(pill);
1268         if (rc != 0) {
1269                 CERROR("%s: Can't pack response: rc = %d\n",
1270                        mdt_obd_name(mdt), rc);
1271                 RETURN(rc);
1272         }
1273
1274         /* Prepare the update reply buffer */
1275         update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1276         update_init_reply_buf(update_reply, count);
1277         info->mti_u.update.mti_update_reply = update_reply;
1278
1279         rc = out_tx_start(env, dt, th);
1280         if (rc != 0)
1281                 RETURN(rc);
1282
1283         /* Walk through updates in the request to execute them synchronously */
1284         off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1285         for (i = 0; i < count; i++) {
1286                 struct out_handler *h;
1287                 struct dt_object   *dt_obj;
1288
1289                 update = (struct update *)((char *)ubuf + off);
1290                 if (old_batchid == -1) {
1291                         old_batchid = update->u_batchid;
1292                 } else if (old_batchid != update->u_batchid) {
1293                         /* Stop the current update transaction,
1294                          * create a new one */
1295                         rc = out_tx_end(env, th);
1296                         if (rc != 0)
1297                                 RETURN(rc);
1298
1299                         rc = out_tx_start(env, dt, th);
1300                         if (rc != 0)
1301                                 RETURN(rc);
1302                         old_batchid = update->u_batchid;
1303                 }
1304
1305                 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1306                 if (!fid_is_sane(&update->u_fid)) {
1307                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1308                                mdt_obd_name(mdt), PFID(&update->u_fid),
1309                                -EPROTO);
1310                         GOTO(out, rc = err_serious(-EPROTO));
1311                 }
1312
1313                 dt_obj = dt_locate(env, dt, &update->u_fid);
1314                 if (IS_ERR(dt_obj))
1315                         GOTO(out, rc = PTR_ERR(dt_obj));
1316
1317                 info->mti_u.update.mti_dt_object = dt_obj;
1318                 info->mti_u.update.mti_update = update;
1319                 info->mti_u.update.mti_update_reply_index = i;
1320
1321                 h = mdt_handler_find(update->u_type, out_handlers);
1322                 if (likely(h != NULL)) {
1323                         /* For real modification RPC, check if the update
1324                          * has been executed */
1325                         if (h->mh_flags & MUTABOR) {
1326                                 struct ptlrpc_request *req = mdt_info_req(info);
1327
1328                                 if (out_check_resent(env, dt, dt_obj, req,
1329                                                      out_reconstruct,
1330                                                      update_reply, i))
1331                                         GOTO(next, rc);
1332                         }
1333
1334                         rc = h->mh_act(info);
1335                 } else {
1336                         CERROR("%s: The unsupported opc: 0x%x\n",
1337                                mdt_obd_name(mdt), update->u_type);
1338                         lu_object_put(env, &dt_obj->do_lu);
1339                         GOTO(out, rc = -ENOTSUPP);
1340                 }
1341 next:
1342                 lu_object_put(env, &dt_obj->do_lu);
1343                 if (rc < 0)
1344                         GOTO(out, rc);
1345                 off += cfs_size_round(update_size(update));
1346         }
1347 out:
1348         rc1 = out_tx_end(env, th);
1349         rc = rc == 0 ? rc1 : rc;
1350         RETURN(rc);
1351 }