Whamcloud - gitweb
LU-4353 strncmp: Replace incorrect strncmp()s with strcmp()
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         i = ta->ta_argno;
48         LASSERT(i < UPDATE_MAX_OPS);
49
50         ta->ta_argno++;
51
52         ta->ta_args[i].exec_fn = func;
53         ta->ta_args[i].undo_fn = undo;
54         ta->ta_args[i].file    = file;
55         ta->ta_args[i].line    = line;
56
57         return &ta->ta_args[i];
58 }
59
60 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
61                         struct thandle_exec_args *ta)
62 {
63         memset(ta, 0, sizeof(*ta));
64         ta->ta_handle = dt_trans_create(env, dt);
65         if (IS_ERR(ta->ta_handle)) {
66                 CERROR("%s: start handle error: rc = %ld\n",
67                        dt_obd_name(dt), PTR_ERR(ta->ta_handle));
68                 return PTR_ERR(ta->ta_handle);
69         }
70         ta->ta_dev = dt;
71         /*For phase I, sync for cross-ref operation*/
72         ta->ta_handle->th_sync = 1;
73         return 0;
74 }
75
76 static int out_trans_start(const struct lu_env *env,
77                            struct thandle_exec_args *ta)
78 {
79         /* Always do sync commit for Phase I */
80         LASSERT(ta->ta_handle->th_sync != 0);
81         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
82 }
83
84 static int out_trans_stop(const struct lu_env *env,
85                           struct thandle_exec_args *ta, int err)
86 {
87         int i;
88         int rc;
89
90         ta->ta_handle->th_result = err;
91         LASSERT(ta->ta_handle->th_sync != 0);
92         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
93         for (i = 0; i < ta->ta_argno; i++) {
94                 if (ta->ta_args[i].object != NULL) {
95                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
96                         ta->ta_args[i].object = NULL;
97                 }
98         }
99
100         return rc;
101 }
102
103 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
104 {
105         struct tgt_session_info *tsi = tgt_ses_info(env);
106         int i = 0, rc;
107
108         LASSERT(ta->ta_dev);
109         LASSERT(ta->ta_handle);
110
111         if (ta->ta_err != 0 || ta->ta_argno == 0)
112                 GOTO(stop, rc = ta->ta_err);
113
114         rc = out_trans_start(env, ta);
115         if (unlikely(rc))
116                 GOTO(stop, rc);
117
118         for (i = 0; i < ta->ta_argno; i++) {
119                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
120                                             &ta->ta_args[i]);
121                 if (unlikely(rc)) {
122                         CDEBUG(D_INFO, "error during execution of #%u from"
123                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
124                                ta->ta_args[i].line, rc);
125                         while (--i >= 0) {
126                                 LASSERTF(ta->ta_args[i].undo_fn != NULL,
127                                     "can't undo changes, hope for failover!\n");
128                                 ta->ta_args[i].undo_fn(env, ta->ta_handle,
129                                                        &ta->ta_args[i]);
130                         }
131                         break;
132                 }
133         }
134
135         /* Only fail for real update */
136         tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
137 stop:
138         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
139                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
140         out_trans_stop(env, ta, rc);
141         ta->ta_handle = NULL;
142         ta->ta_argno = 0;
143         ta->ta_err = 0;
144
145         RETURN(rc);
146 }
147
148 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
149                             struct dt_object *obj, struct update_reply *reply,
150                             int index)
151 {
152         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
153                dt_obd_name(dt), reply, index, 0);
154
155         update_insert_reply(reply, NULL, 0, index, 0);
156         return;
157 }
158
159 typedef void (*out_reconstruct_t)(const struct lu_env *env,
160                                   struct dt_device *dt,
161                                   struct dt_object *obj,
162                                   struct update_reply *reply,
163                                   int index);
164
165 static inline int out_check_resent(const struct lu_env *env,
166                                    struct dt_device *dt,
167                                    struct dt_object *obj,
168                                    struct ptlrpc_request *req,
169                                    out_reconstruct_t reconstruct,
170                                    struct update_reply *reply,
171                                    int index)
172 {
173         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
174                 return 0;
175
176         if (req_xid_is_last(req)) {
177                 reconstruct(env, dt, obj, reply, index);
178                 return 1;
179         }
180         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
181                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
182         return 0;
183 }
184
185 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
186                            struct thandle *th)
187 {
188         int rc;
189
190         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
191                PFID(lu_object_fid(&dt_obj->do_lu)));
192
193         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
194         rc = dt_destroy(env, dt_obj, th);
195         dt_write_unlock(env, dt_obj);
196
197         return rc;
198 }
199
200 /**
201  * All of the xxx_undo will be used once execution failed,
202  * But because all of the required resource has been reserved in
203  * declare phase, i.e. if declare succeed, it should make sure
204  * the following executing phase succeed in anyway, so these undo
205  * should be useless for most of the time in Phase I
206  */
207 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
208                        struct tx_arg *arg)
209 {
210         int rc;
211
212         rc = out_obj_destroy(env, arg->object, th);
213         if (rc != 0)
214                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
215                        dt_obd_name(th->th_dev), rc);
216         return rc;
217 }
218
219 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
220                        struct tx_arg *arg)
221 {
222         struct dt_object        *dt_obj = arg->object;
223         int                      rc;
224
225         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
226                dt_obd_name(th->th_dev),
227                PFID(lu_object_fid(&arg->object->do_lu)),
228                arg->u.create.dof.dof_type,
229                arg->u.create.attr.la_mode & S_IFMT);
230
231         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
232         rc = dt_create(env, dt_obj, &arg->u.create.attr,
233                        &arg->u.create.hint, &arg->u.create.dof, th);
234
235         dt_write_unlock(env, dt_obj);
236
237         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
238                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
239
240         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
241
242         return rc;
243 }
244
245 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
246                            struct lu_attr *attr, struct lu_fid *parent_fid,
247                            struct dt_object_format *dof,
248                            struct thandle_exec_args *ta,
249                            struct update_reply *reply,
250                            int index, char *file, int line)
251 {
252         struct tx_arg *arg;
253
254         LASSERT(ta->ta_handle != NULL);
255         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
256                                        ta->ta_handle);
257         if (ta->ta_err != 0)
258                 return ta->ta_err;
259
260         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
261                           line);
262         LASSERT(arg);
263
264         /* release the object in out_trans_stop */
265         lu_object_get(&obj->do_lu);
266         arg->object = obj;
267         arg->u.create.attr = *attr;
268         if (parent_fid)
269                 arg->u.create.fid = *parent_fid;
270         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
271         arg->u.create.dof  = *dof;
272         arg->reply = reply;
273         arg->index = index;
274
275         return 0;
276 }
277
278 static int out_create(struct tgt_session_info *tsi)
279 {
280         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
281         struct update           *update = tti->tti_u.update.tti_update;
282         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
283         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
284         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
285         struct lu_attr          *attr = &tti->tti_attr;
286         struct lu_fid           *fid = NULL;
287         struct obdo             *wobdo;
288         int                     size;
289         int                     rc;
290
291         ENTRY;
292
293         wobdo = update_param_buf(update, 0, &size);
294         if (wobdo == NULL || size != sizeof(*wobdo)) {
295                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
296                        tgt_name(tsi->tsi_tgt), -EPROTO);
297                 RETURN(err_serious(-EPROTO));
298         }
299
300         obdo_le_to_cpu(wobdo, wobdo);
301         lustre_get_wire_obdo(NULL, lobdo, wobdo);
302         la_from_obdo(attr, lobdo, lobdo->o_valid);
303
304         dof->dof_type = dt_mode_to_dft(attr->la_mode);
305         if (S_ISDIR(attr->la_mode)) {
306                 int size;
307
308                 fid = update_param_buf(update, 1, &size);
309                 if (fid == NULL || size != sizeof(*fid)) {
310                         CERROR("%s: invalid fid: rc = %d\n",
311                                tgt_name(tsi->tsi_tgt), -EPROTO);
312                         RETURN(err_serious(-EPROTO));
313                 }
314                 fid_le_to_cpu(fid, fid);
315                 if (!fid_is_sane(fid)) {
316                         CERROR("%s: invalid fid "DFID": rc = %d\n",
317                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
318                         RETURN(err_serious(-EPROTO));
319                 }
320         }
321
322         if (lu_object_exists(&obj->do_lu))
323                 RETURN(-EEXIST);
324
325         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
326                            &tti->tti_tea,
327                            tti->tti_u.update.tti_update_reply,
328                            tti->tti_u.update.tti_update_reply_index);
329
330         RETURN(rc);
331 }
332
333 static int out_tx_attr_set_undo(const struct lu_env *env,
334                                 struct thandle *th, struct tx_arg *arg)
335 {
336         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
337                dt_obd_name(th->th_dev),
338                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
339
340         return -ENOTSUPP;
341 }
342
343 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
344                                 struct tx_arg *arg)
345 {
346         struct dt_object        *dt_obj = arg->object;
347         int                     rc;
348
349         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
350                PFID(lu_object_fid(&dt_obj->do_lu)));
351
352         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
353         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
354         dt_write_unlock(env, dt_obj);
355
356         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
357                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
358
359         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
360
361         return rc;
362 }
363
364 static int __out_tx_attr_set(const struct lu_env *env,
365                              struct dt_object *dt_obj,
366                              const struct lu_attr *attr,
367                              struct thandle_exec_args *th,
368                              struct update_reply *reply, int index,
369                              char *file, int line)
370 {
371         struct tx_arg           *arg;
372
373         LASSERT(th->ta_handle != NULL);
374         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
375         if (th->ta_err != 0)
376                 return th->ta_err;
377
378         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
379                           file, line);
380         LASSERT(arg);
381         lu_object_get(&dt_obj->do_lu);
382         arg->object = dt_obj;
383         arg->u.attr_set.attr = *attr;
384         arg->reply = reply;
385         arg->index = index;
386         return 0;
387 }
388
389 static int out_attr_set(struct tgt_session_info *tsi)
390 {
391         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
392         struct update           *update = tti->tti_u.update.tti_update;
393         struct lu_attr          *attr = &tti->tti_attr;
394         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
395         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
396         struct obdo             *wobdo;
397         int                      size;
398         int                      rc;
399
400         ENTRY;
401
402         wobdo = update_param_buf(update, 0, &size);
403         if (wobdo == NULL || size != sizeof(*wobdo)) {
404                 CERROR("%s: empty obdo in the update: rc = %d\n",
405                        tgt_name(tsi->tsi_tgt), -EPROTO);
406                 RETURN(err_serious(-EPROTO));
407         }
408
409         attr->la_valid = 0;
410         attr->la_valid = 0;
411         obdo_le_to_cpu(wobdo, wobdo);
412         lustre_get_wire_obdo(NULL, lobdo, wobdo);
413         la_from_obdo(attr, lobdo, lobdo->o_valid);
414
415         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
416                              tti->tti_u.update.tti_update_reply,
417                              tti->tti_u.update.tti_update_reply_index);
418
419         RETURN(rc);
420 }
421
422 static int out_attr_get(struct tgt_session_info *tsi)
423 {
424         const struct lu_env     *env = tsi->tsi_env;
425         struct tgt_thread_info  *tti = tgt_th_info(env);
426         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
427         struct lu_attr          *la = &tti->tti_attr;
428         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
429         int                     rc;
430
431         ENTRY;
432
433         if (!lu_object_exists(&obj->do_lu))
434                 RETURN(-ENOENT);
435
436         dt_read_lock(env, obj, MOR_TGT_CHILD);
437         rc = dt_attr_get(env, obj, la, NULL);
438         if (rc)
439                 GOTO(out_unlock, rc);
440         /*
441          * If it is a directory, we will also check whether the
442          * directory is empty.
443          * la_flags = 0 : Empty.
444          *          = 1 : Not empty.
445          */
446         la->la_flags = 0;
447         if (S_ISDIR(la->la_mode)) {
448                 struct dt_it            *it;
449                 const struct dt_it_ops  *iops;
450
451                 if (!dt_try_as_dir(env, obj))
452                         GOTO(out_unlock, rc = -ENOTDIR);
453
454                 iops = &obj->do_index_ops->dio_it;
455                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
456                 if (!IS_ERR(it)) {
457                         int  result;
458                         result = iops->get(env, it, (const void *)"");
459                         if (result > 0) {
460                                 int i;
461                                 for (result = 0, i = 0; result == 0 && i < 3;
462                                      ++i)
463                                         result = iops->next(env, it);
464                                 if (result == 0)
465                                         la->la_flags = 1;
466                         } else if (result == 0)
467                                 /*
468                                  * Huh? Index contains no zero key?
469                                  */
470                                 rc = -EIO;
471
472                         iops->put(env, it);
473                         iops->fini(env, it);
474                 }
475         }
476
477         obdo->o_valid = 0;
478         obdo_from_la(obdo, la, la->la_valid);
479         obdo_cpu_to_le(obdo, obdo);
480         lustre_set_wire_obdo(NULL, obdo, obdo);
481
482 out_unlock:
483         dt_read_unlock(env, obj);
484
485         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
486                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
487                0, rc);
488
489         update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
490                             sizeof(*obdo), 0, rc);
491         RETURN(rc);
492 }
493
494 static int out_xattr_get(struct tgt_session_info *tsi)
495 {
496         const struct lu_env     *env = tsi->tsi_env;
497         struct tgt_thread_info  *tti = tgt_th_info(env);
498         struct update           *update = tti->tti_u.update.tti_update;
499         struct lu_buf           *lbuf = &tti->tti_buf;
500         struct update_reply     *reply = tti->tti_u.update.tti_update_reply;
501         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
502         char                    *name;
503         void                    *ptr;
504         int                      rc;
505
506         ENTRY;
507
508         name = (char *)update_param_buf(update, 0, NULL);
509         if (name == NULL) {
510                 CERROR("%s: empty name for xattr get: rc = %d\n",
511                        tgt_name(tsi->tsi_tgt), -EPROTO);
512                 RETURN(err_serious(-EPROTO));
513         }
514
515         ptr = update_get_buf_internal(reply, 0, NULL);
516         LASSERT(ptr != NULL);
517
518         /* The first 4 bytes(int) are used to store the result */
519         lbuf->lb_buf = (char *)ptr + sizeof(int);
520         lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
521         dt_read_lock(env, obj, MOR_TGT_CHILD);
522         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
523         dt_read_unlock(env, obj);
524         if (rc < 0) {
525                 lbuf->lb_len = 0;
526                 GOTO(out, rc);
527         }
528         if (rc == 0) {
529                 lbuf->lb_len = 0;
530                 GOTO(out, rc = -ENOENT);
531         }
532         lbuf->lb_len = rc;
533         rc = 0;
534         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
535                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
536                name, (int)lbuf->lb_len);
537 out:
538         *(int *)ptr = rc;
539         reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
540         RETURN(rc);
541 }
542
543 static int out_index_lookup(struct tgt_session_info *tsi)
544 {
545         const struct lu_env     *env = tsi->tsi_env;
546         struct tgt_thread_info  *tti = tgt_th_info(env);
547         struct update           *update = tti->tti_u.update.tti_update;
548         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
549         char                    *name;
550         int                      rc;
551
552         ENTRY;
553
554         if (!lu_object_exists(&obj->do_lu))
555                 RETURN(-ENOENT);
556
557         name = (char *)update_param_buf(update, 0, NULL);
558         if (name == NULL) {
559                 CERROR("%s: empty name for lookup: rc = %d\n",
560                        tgt_name(tsi->tsi_tgt), -EPROTO);
561                 RETURN(err_serious(-EPROTO));
562         }
563
564         dt_read_lock(env, obj, MOR_TGT_CHILD);
565         if (!dt_try_as_dir(env, obj))
566                 GOTO(out_unlock, rc = -ENOTDIR);
567
568         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
569                 (struct dt_key *)name, NULL);
570
571         if (rc < 0)
572                 GOTO(out_unlock, rc);
573
574         if (rc == 0)
575                 rc += 1;
576
577         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
578                PFID(lu_object_fid(&obj->do_lu)), name,
579                PFID(&tti->tti_fid1), rc);
580         fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
581
582 out_unlock:
583         dt_read_unlock(env, obj);
584
585         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
586                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
587                0, rc);
588
589         update_insert_reply(tti->tti_u.update.tti_update_reply,
590                             &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
591         RETURN(rc);
592 }
593
594 static int out_tx_xattr_set_exec(const struct lu_env *env,
595                                  struct thandle *th,
596                                  struct tx_arg *arg)
597 {
598         struct dt_object *dt_obj = arg->object;
599         int rc;
600
601         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
602                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
603                arg->u.xattr_set.name, arg->u.xattr_set.flags);
604
605         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
606         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
607                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
608                           th, NULL);
609         dt_write_unlock(env, dt_obj);
610         /**
611          * Ignore errors if this is LINK EA
612          **/
613         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
614                 rc = 0;
615
616         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
617                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
618
619         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
620
621         return rc;
622 }
623
624 static int __out_tx_xattr_set(const struct lu_env *env,
625                               struct dt_object *dt_obj,
626                               const struct lu_buf *buf,
627                               const char *name, int flags,
628                               struct thandle_exec_args *ta,
629                               struct update_reply *reply, int index,
630                               char *file, int line)
631 {
632         struct tx_arg           *arg;
633
634         LASSERT(ta->ta_handle != NULL);
635         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
636                                           flags, ta->ta_handle);
637         if (ta->ta_err != 0)
638                 return ta->ta_err;
639
640         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
641         LASSERT(arg);
642         lu_object_get(&dt_obj->do_lu);
643         arg->object = dt_obj;
644         arg->u.xattr_set.name = name;
645         arg->u.xattr_set.flags = flags;
646         arg->u.xattr_set.buf = *buf;
647         arg->reply = reply;
648         arg->index = index;
649         arg->u.xattr_set.csum = 0;
650         return 0;
651 }
652
653 static int out_xattr_set(struct tgt_session_info *tsi)
654 {
655         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
656         struct update           *update = tti->tti_u.update.tti_update;
657         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
658         struct lu_buf           *lbuf = &tti->tti_buf;
659         char                    *name;
660         char                    *buf;
661         char                    *tmp;
662         int                      buf_len = 0;
663         int                      flag;
664         int                      rc;
665         ENTRY;
666
667         name = update_param_buf(update, 0, NULL);
668         if (name == NULL) {
669                 CERROR("%s: empty name for xattr set: rc = %d\n",
670                        tgt_name(tsi->tsi_tgt), -EPROTO);
671                 RETURN(err_serious(-EPROTO));
672         }
673
674         buf = (char *)update_param_buf(update, 1, &buf_len);
675         if (buf == NULL || buf_len == 0) {
676                 CERROR("%s: empty buf for xattr set: rc = %d\n",
677                        tgt_name(tsi->tsi_tgt), -EPROTO);
678                 RETURN(err_serious(-EPROTO));
679         }
680
681         lbuf->lb_buf = buf;
682         lbuf->lb_len = buf_len;
683
684         tmp = (char *)update_param_buf(update, 2, NULL);
685         if (tmp == NULL) {
686                 CERROR("%s: empty flag for xattr set: rc = %d\n",
687                        tgt_name(tsi->tsi_tgt), -EPROTO);
688                 RETURN(err_serious(-EPROTO));
689         }
690
691         flag = le32_to_cpu(*(int *)tmp);
692
693         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
694                               &tti->tti_tea,
695                               tti->tti_u.update.tti_update_reply,
696                               tti->tti_u.update.tti_update_reply_index);
697         RETURN(rc);
698 }
699
700 static int out_obj_ref_add(const struct lu_env *env,
701                            struct dt_object *dt_obj,
702                            struct thandle *th)
703 {
704         int rc;
705
706         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
707         rc = dt_ref_add(env, dt_obj, th);
708         dt_write_unlock(env, dt_obj);
709
710         return rc;
711 }
712
713 static int out_obj_ref_del(const struct lu_env *env,
714                            struct dt_object *dt_obj,
715                            struct thandle *th)
716 {
717         int rc;
718
719         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
720         rc = dt_ref_del(env, dt_obj, th);
721         dt_write_unlock(env, dt_obj);
722
723         return rc;
724 }
725
726 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
727                                struct tx_arg *arg)
728 {
729         struct dt_object *dt_obj = arg->object;
730         int rc;
731
732         rc = out_obj_ref_add(env, dt_obj, th);
733
734         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
735                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
736
737         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
738         return rc;
739 }
740
741 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
742                                struct tx_arg *arg)
743 {
744         return out_obj_ref_del(env, arg->object, th);
745 }
746
747 static int __out_tx_ref_add(const struct lu_env *env,
748                             struct dt_object *dt_obj,
749                             struct thandle_exec_args *ta,
750                             struct update_reply *reply,
751                             int index, char *file, int line)
752 {
753         struct tx_arg   *arg;
754
755         LASSERT(ta->ta_handle != NULL);
756         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
757         if (ta->ta_err != 0)
758                 return ta->ta_err;
759
760         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
761                           line);
762         LASSERT(arg);
763         lu_object_get(&dt_obj->do_lu);
764         arg->object = dt_obj;
765         arg->reply = reply;
766         arg->index = index;
767         return 0;
768 }
769
770 /**
771  * increase ref of the object
772  **/
773 static int out_ref_add(struct tgt_session_info *tsi)
774 {
775         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
776         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
777         int                      rc;
778
779         ENTRY;
780
781         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
782                             tti->tti_u.update.tti_update_reply,
783                             tti->tti_u.update.tti_update_reply_index);
784         RETURN(rc);
785 }
786
787 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
788                                struct tx_arg *arg)
789 {
790         struct dt_object        *dt_obj = arg->object;
791         int                      rc;
792
793         rc = out_obj_ref_del(env, dt_obj, th);
794
795         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
796                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
797
798         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
799
800         return rc;
801 }
802
803 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
804                                struct tx_arg *arg)
805 {
806         return out_obj_ref_add(env, arg->object, th);
807 }
808
809 static int __out_tx_ref_del(const struct lu_env *env,
810                             struct dt_object *dt_obj,
811                             struct thandle_exec_args *ta,
812                             struct update_reply *reply,
813                             int index, char *file, int line)
814 {
815         struct tx_arg   *arg;
816
817         LASSERT(ta->ta_handle != NULL);
818         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
819         if (ta->ta_err != 0)
820                 return ta->ta_err;
821
822         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
823                           line);
824         LASSERT(arg);
825         lu_object_get(&dt_obj->do_lu);
826         arg->object = dt_obj;
827         arg->reply = reply;
828         arg->index = index;
829         return 0;
830 }
831
832 static int out_ref_del(struct tgt_session_info *tsi)
833 {
834         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
835         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
836         int                      rc;
837
838         ENTRY;
839
840         if (!lu_object_exists(&obj->do_lu))
841                 RETURN(-ENOENT);
842
843         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
844                             tti->tti_u.update.tti_update_reply,
845                             tti->tti_u.update.tti_update_reply_index);
846         RETURN(rc);
847 }
848
849 static int out_obj_index_insert(const struct lu_env *env,
850                                 struct dt_object *dt_obj,
851                                 const struct dt_rec *rec,
852                                 const struct dt_key *key,
853                                 struct thandle *th)
854 {
855         int rc;
856
857         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
858                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
859                (char *)key, PFID((struct lu_fid *)rec));
860
861         if (dt_try_as_dir(env, dt_obj) == 0)
862                 return -ENOTDIR;
863
864         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
865         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
866         dt_write_unlock(env, dt_obj);
867
868         return rc;
869 }
870
871 static int out_obj_index_delete(const struct lu_env *env,
872                                 struct dt_object *dt_obj,
873                                 const struct dt_key *key,
874                                 struct thandle *th)
875 {
876         int rc;
877
878         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
879                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
880                (char *)key);
881
882         if (dt_try_as_dir(env, dt_obj) == 0)
883                 return -ENOTDIR;
884
885         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
886         rc = dt_delete(env, dt_obj, key, th, NULL);
887         dt_write_unlock(env, dt_obj);
888
889         return rc;
890 }
891
892 static int out_tx_index_insert_exec(const struct lu_env *env,
893                                     struct thandle *th, struct tx_arg *arg)
894 {
895         struct dt_object *dt_obj = arg->object;
896         int rc;
897
898         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
899                                   arg->u.insert.key, th);
900
901         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
902                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
903
904         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
905
906         return rc;
907 }
908
909 static int out_tx_index_insert_undo(const struct lu_env *env,
910                                     struct thandle *th, struct tx_arg *arg)
911 {
912         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
913 }
914
915 static int __out_tx_index_insert(const struct lu_env *env,
916                                  struct dt_object *dt_obj,
917                                  char *name, struct lu_fid *fid,
918                                  struct thandle_exec_args *ta,
919                                  struct update_reply *reply,
920                                  int index, char *file, int line)
921 {
922         struct tx_arg *arg;
923
924         LASSERT(ta->ta_handle != NULL);
925
926         if (lu_object_exists(&dt_obj->do_lu)) {
927                 if (dt_try_as_dir(env, dt_obj) == 0) {
928                         ta->ta_err = -ENOTDIR;
929                         return ta->ta_err;
930                 }
931                 ta->ta_err = dt_declare_insert(env, dt_obj,
932                                                (struct dt_rec *)fid,
933                                                (struct dt_key *)name,
934                                                ta->ta_handle);
935         }
936
937         if (ta->ta_err != 0)
938                 return ta->ta_err;
939
940         arg = tx_add_exec(ta, out_tx_index_insert_exec,
941                           out_tx_index_insert_undo, file,
942                           line);
943         LASSERT(arg);
944         lu_object_get(&dt_obj->do_lu);
945         arg->object = dt_obj;
946         arg->reply = reply;
947         arg->index = index;
948         arg->u.insert.rec = (struct dt_rec *)fid;
949         arg->u.insert.key = (struct dt_key *)name;
950
951         return 0;
952 }
953
954 static int out_index_insert(struct tgt_session_info *tsi)
955 {
956         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
957         struct update     *update = tti->tti_u.update.tti_update;
958         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
959         struct lu_fid     *fid;
960         char              *name;
961         int                rc = 0;
962         int                size;
963
964         ENTRY;
965
966         name = (char *)update_param_buf(update, 0, NULL);
967         if (name == NULL) {
968                 CERROR("%s: empty name for index insert: rc = %d\n",
969                        tgt_name(tsi->tsi_tgt), -EPROTO);
970                 RETURN(err_serious(-EPROTO));
971         }
972
973         fid = (struct lu_fid *)update_param_buf(update, 1, &size);
974         if (fid == NULL || size != sizeof(*fid)) {
975                 CERROR("%s: invalid fid: rc = %d\n",
976                        tgt_name(tsi->tsi_tgt), -EPROTO);
977                        RETURN(err_serious(-EPROTO));
978         }
979
980         fid_le_to_cpu(fid, fid);
981         if (!fid_is_sane(fid)) {
982                 CERROR("%s: invalid FID "DFID": rc = %d\n",
983                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
984                 RETURN(err_serious(-EPROTO));
985         }
986
987         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
988                                  &tti->tti_tea,
989                                  tti->tti_u.update.tti_update_reply,
990                                  tti->tti_u.update.tti_update_reply_index);
991         RETURN(rc);
992 }
993
994 static int out_tx_index_delete_exec(const struct lu_env *env,
995                                     struct thandle *th,
996                                     struct tx_arg *arg)
997 {
998         int rc;
999
1000         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1001
1002         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1003                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1004
1005         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1006
1007         return rc;
1008 }
1009
1010 static int out_tx_index_delete_undo(const struct lu_env *env,
1011                                     struct thandle *th,
1012                                     struct tx_arg *arg)
1013 {
1014         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1015                dt_obd_name(th->th_dev), -ENOTSUPP);
1016         return -ENOTSUPP;
1017 }
1018
1019 static int __out_tx_index_delete(const struct lu_env *env,
1020                                  struct dt_object *dt_obj, char *name,
1021                                  struct thandle_exec_args *ta,
1022                                  struct update_reply *reply,
1023                                  int index, char *file, int line)
1024 {
1025         struct tx_arg *arg;
1026
1027         if (dt_try_as_dir(env, dt_obj) == 0) {
1028                 ta->ta_err = -ENOTDIR;
1029                 return ta->ta_err;
1030         }
1031
1032         LASSERT(ta->ta_handle != NULL);
1033         ta->ta_err = dt_declare_delete(env, dt_obj,
1034                                        (struct dt_key *)name,
1035                                        ta->ta_handle);
1036         if (ta->ta_err != 0)
1037                 return ta->ta_err;
1038
1039         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1040                           out_tx_index_delete_undo, file,
1041                           line);
1042         LASSERT(arg);
1043         lu_object_get(&dt_obj->do_lu);
1044         arg->object = dt_obj;
1045         arg->reply = reply;
1046         arg->index = index;
1047         arg->u.insert.key = (struct dt_key *)name;
1048         return 0;
1049 }
1050
1051 static int out_index_delete(struct tgt_session_info *tsi)
1052 {
1053         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1054         struct update           *update = tti->tti_u.update.tti_update;
1055         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1056         char                    *name;
1057         int                      rc = 0;
1058
1059         if (!lu_object_exists(&obj->do_lu))
1060                 RETURN(-ENOENT);
1061
1062         name = (char *)update_param_buf(update, 0, NULL);
1063         if (name == NULL) {
1064                 CERROR("%s: empty name for index delete: rc = %d\n",
1065                        tgt_name(tsi->tsi_tgt), -EPROTO);
1066                 RETURN(err_serious(-EPROTO));
1067         }
1068
1069         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1070                                  tti->tti_u.update.tti_update_reply,
1071                                  tti->tti_u.update.tti_update_reply_index);
1072         RETURN(rc);
1073 }
1074
1075 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1076                                struct tx_arg *arg)
1077 {
1078         struct dt_object *dt_obj = arg->object;
1079         int rc;
1080
1081         rc = out_obj_destroy(env, dt_obj, th);
1082
1083         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1084                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1085
1086         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1087
1088         RETURN(rc);
1089 }
1090
1091 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1092                                struct tx_arg *arg)
1093 {
1094         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1095                dt_obd_name(th->th_dev), -ENOTSUPP);
1096         return -ENOTSUPP;
1097 }
1098
1099 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1100                              struct thandle_exec_args *ta,
1101                              struct update_reply *reply,
1102                              int index, char *file, int line)
1103 {
1104         struct tx_arg *arg;
1105
1106         LASSERT(ta->ta_handle != NULL);
1107         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1108         if (ta->ta_err)
1109                 return ta->ta_err;
1110
1111         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1112                           file, line);
1113         LASSERT(arg);
1114         lu_object_get(&dt_obj->do_lu);
1115         arg->object = dt_obj;
1116         arg->reply = reply;
1117         arg->index = index;
1118         return 0;
1119 }
1120
1121 static int out_destroy(struct tgt_session_info *tsi)
1122 {
1123         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1124         struct update           *update = tti->tti_u.update.tti_update;
1125         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1126         struct lu_fid           *fid;
1127         int                      rc;
1128         ENTRY;
1129
1130         fid = &update->u_fid;
1131         fid_le_to_cpu(fid, fid);
1132         if (!fid_is_sane(fid)) {
1133                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1134                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1135                 RETURN(err_serious(-EPROTO));
1136         }
1137
1138         if (!lu_object_exists(&obj->do_lu))
1139                 RETURN(-ENOENT);
1140
1141         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1142                             tti->tti_u.update.tti_update_reply,
1143                             tti->tti_u.update.tti_update_reply_index);
1144
1145         RETURN(rc);
1146 }
1147
1148 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1149 [opc - OBJ_CREATE] = {                                  \
1150         .th_name    = name,                             \
1151         .th_fail_id = 0,                                \
1152         .th_opc     = opc,                              \
1153         .th_flags   = flags,                            \
1154         .th_act     = fn,                               \
1155         .th_fmt     = NULL,                             \
1156         .th_version = 0,                                \
1157 }
1158
1159 #define out_handler mdt_handler
1160 static struct tgt_handler out_update_ops[] = {
1161         DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
1162                      out_create),
1163         DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
1164                      out_destroy),
1165         DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
1166                      out_ref_add),
1167         DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
1168                      out_ref_del),
1169         DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set",  MUTABOR | HABEO_REFERO,
1170                      out_attr_set),
1171         DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get",  HABEO_REFERO,
1172                      out_attr_get),
1173         DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
1174                      out_xattr_set),
1175         DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
1176                      out_xattr_get),
1177         DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
1178                      out_index_lookup),
1179         DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
1180                      MUTABOR | HABEO_REFERO, out_index_insert),
1181         DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
1182                      MUTABOR | HABEO_REFERO, out_index_delete),
1183 };
1184
1185 struct tgt_handler *out_handler_find(__u32 opc)
1186 {
1187         struct tgt_handler *h;
1188
1189         h = NULL;
1190         if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
1191                 h = &out_update_ops[opc - OBJ_CREATE];
1192                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1193                          h->th_opc, opc);
1194         } else {
1195                 h = NULL; /* unsupported opc */
1196         }
1197         return h;
1198 }
1199
1200 /**
1201  * Object updates between Targets. Because all the updates has been
1202  * dis-assemblied into object updates at sender side, so OUT will
1203  * call OSD API directly to execute these updates.
1204  *
1205  * In DNE phase I all of the updates in the request need to be executed
1206  * in one transaction, and the transaction has to be synchronously.
1207  *
1208  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1209  * format.
1210  */
1211 int out_handle(struct tgt_session_info *tsi)
1212 {
1213         const struct lu_env             *env = tsi->tsi_env;
1214         struct tgt_thread_info          *tti = tgt_th_info(env);
1215         struct thandle_exec_args        *ta = &tti->tti_tea;
1216         struct req_capsule              *pill = tsi->tsi_pill;
1217         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1218         struct update_buf               *ubuf;
1219         struct update                   *update;
1220         struct update_reply             *update_reply;
1221         int                              bufsize;
1222         int                              count;
1223         int                              old_batchid = -1;
1224         unsigned                         off;
1225         int                              i;
1226         int                              rc = 0;
1227         int                              rc1 = 0;
1228
1229         ENTRY;
1230
1231         req_capsule_set(pill, &RQF_UPDATE_OBJ);
1232         bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1233         if (bufsize != UPDATE_BUFFER_SIZE) {
1234                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1235                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1236                 RETURN(err_serious(-EPROTO));
1237         }
1238
1239         ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1240         if (ubuf == NULL) {
1241                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1242                        -EPROTO);
1243                 RETURN(err_serious(-EPROTO));
1244         }
1245
1246         if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1247                 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1248                        tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
1249                        UPDATE_BUFFER_MAGIC, -EPROTO);
1250                 RETURN(err_serious(-EPROTO));
1251         }
1252
1253         count = le32_to_cpu(ubuf->ub_count);
1254         if (count <= 0) {
1255                 CERROR("%s: No update!: rc = %d\n",
1256                        tgt_name(tsi->tsi_tgt), -EPROTO);
1257                 RETURN(err_serious(-EPROTO));
1258         }
1259
1260         req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1261                              UPDATE_BUFFER_SIZE);
1262         rc = req_capsule_server_pack(pill);
1263         if (rc != 0) {
1264                 CERROR("%s: Can't pack response: rc = %d\n",
1265                        tgt_name(tsi->tsi_tgt), rc);
1266                 RETURN(rc);
1267         }
1268
1269         /* Prepare the update reply buffer */
1270         update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1271         if (update_reply == NULL)
1272                 RETURN(err_serious(-EPROTO));
1273         update_init_reply_buf(update_reply, count);
1274         tti->tti_u.update.tti_update_reply = update_reply;
1275
1276         rc = out_tx_start(env, dt, ta);
1277         if (rc != 0)
1278                 RETURN(rc);
1279
1280         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1281
1282         /* Walk through updates in the request to execute them synchronously */
1283         off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1284         for (i = 0; i < count; i++) {
1285                 struct tgt_handler      *h;
1286                 struct dt_object        *dt_obj;
1287
1288                 update = (struct update *)((char *)ubuf + off);
1289                 if (old_batchid == -1) {
1290                         old_batchid = update->u_batchid;
1291                 } else if (old_batchid != update->u_batchid) {
1292                         /* Stop the current update transaction,
1293                          * create a new one */
1294                         rc = out_tx_end(env, ta);
1295                         if (rc != 0)
1296                                 RETURN(rc);
1297
1298                         rc = out_tx_start(env, dt, ta);
1299                         if (rc != 0)
1300                                 RETURN(rc);
1301                         old_batchid = update->u_batchid;
1302                 }
1303
1304                 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1305                 if (!fid_is_sane(&update->u_fid)) {
1306                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1307                                tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
1308                                -EPROTO);
1309                         GOTO(out, rc = err_serious(-EPROTO));
1310                 }
1311
1312                 dt_obj = dt_locate(env, dt, &update->u_fid);
1313                 if (IS_ERR(dt_obj))
1314                         GOTO(out, rc = PTR_ERR(dt_obj));
1315
1316                 tti->tti_u.update.tti_dt_object = dt_obj;
1317                 tti->tti_u.update.tti_update = update;
1318                 tti->tti_u.update.tti_update_reply_index = i;
1319
1320                 h = out_handler_find(update->u_type);
1321                 if (likely(h != NULL)) {
1322                         /* For real modification RPC, check if the update
1323                          * has been executed */
1324                         if (h->th_flags & MUTABOR) {
1325                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1326
1327                                 if (out_check_resent(env, dt, dt_obj, req,
1328                                                      out_reconstruct,
1329                                                      update_reply, i))
1330                                         GOTO(next, rc);
1331                         }
1332
1333                         rc = h->th_act(tsi);
1334                 } else {
1335                         CERROR("%s: The unsupported opc: 0x%x\n",
1336                                tgt_name(tsi->tsi_tgt), update->u_type);
1337                         lu_object_put(env, &dt_obj->do_lu);
1338                         GOTO(out, rc = -ENOTSUPP);
1339                 }
1340 next:
1341                 lu_object_put(env, &dt_obj->do_lu);
1342                 if (rc < 0)
1343                         GOTO(out, rc);
1344                 off += cfs_size_round(update_size(update));
1345         }
1346 out:
1347         rc1 = out_tx_end(env, ta);
1348         if (rc == 0)
1349                 rc = rc1;
1350         RETURN(rc);
1351 }
1352
1353 struct tgt_handler tgt_out_handlers[] = {
1354 TGT_UPDATE_HDL(MUTABOR, UPDATE_OBJ,     out_handle),
1355 };
1356 EXPORT_SYMBOL(tgt_out_handlers);
1357