Whamcloud - gitweb
ab757689f8f1a6b615e1de1333941c603a02e208
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         i = ta->ta_argno;
48         LASSERT(i < UPDATE_MAX_OPS);
49
50         ta->ta_argno++;
51
52         ta->ta_args[i].exec_fn = func;
53         ta->ta_args[i].undo_fn = undo;
54         ta->ta_args[i].file    = file;
55         ta->ta_args[i].line    = line;
56
57         return &ta->ta_args[i];
58 }
59
60 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
61                         struct thandle_exec_args *ta)
62 {
63         memset(ta, 0, sizeof(*ta));
64         ta->ta_handle = dt_trans_create(env, dt);
65         if (IS_ERR(ta->ta_handle)) {
66                 CERROR("%s: start handle error: rc = %ld\n",
67                        dt_obd_name(dt), PTR_ERR(ta->ta_handle));
68                 return PTR_ERR(ta->ta_handle);
69         }
70         ta->ta_dev = dt;
71         /*For phase I, sync for cross-ref operation*/
72         ta->ta_handle->th_sync = 1;
73         return 0;
74 }
75
76 static int out_trans_start(const struct lu_env *env,
77                            struct thandle_exec_args *ta)
78 {
79         /* Always do sync commit for Phase I */
80         LASSERT(ta->ta_handle->th_sync != 0);
81         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
82 }
83
84 static int out_trans_stop(const struct lu_env *env,
85                           struct thandle_exec_args *ta, int err)
86 {
87         int i;
88         int rc;
89
90         ta->ta_handle->th_result = err;
91         LASSERT(ta->ta_handle->th_sync != 0);
92         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
93         for (i = 0; i < ta->ta_argno; i++) {
94                 if (ta->ta_args[i].object != NULL) {
95                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
96                         ta->ta_args[i].object = NULL;
97                 }
98         }
99
100         return rc;
101 }
102
103 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
104 {
105         struct tgt_session_info *tsi = tgt_ses_info(env);
106         int i = 0, rc;
107
108         LASSERT(ta->ta_dev);
109         LASSERT(ta->ta_handle);
110
111         if (ta->ta_err != 0 || ta->ta_argno == 0)
112                 GOTO(stop, rc = ta->ta_err);
113
114         rc = out_trans_start(env, ta);
115         if (unlikely(rc))
116                 GOTO(stop, rc);
117
118         for (i = 0; i < ta->ta_argno; i++) {
119                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
120                                             &ta->ta_args[i]);
121                 if (unlikely(rc)) {
122                         CDEBUG(D_INFO, "error during execution of #%u from"
123                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
124                                ta->ta_args[i].line, rc);
125                         while (--i >= 0) {
126                                 LASSERTF(ta->ta_args[i].undo_fn != NULL,
127                                     "can't undo changes, hope for failover!\n");
128                                 ta->ta_args[i].undo_fn(env, ta->ta_handle,
129                                                        &ta->ta_args[i]);
130                         }
131                         break;
132                 }
133         }
134
135         /* Only fail for real update */
136         tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
137 stop:
138         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
139                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
140         out_trans_stop(env, ta, rc);
141         ta->ta_handle = NULL;
142         ta->ta_argno = 0;
143         ta->ta_err = 0;
144
145         RETURN(rc);
146 }
147
148 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
149                             struct dt_object *obj, struct update_reply *reply,
150                             int index)
151 {
152         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
153                dt_obd_name(dt), reply, index, 0);
154
155         update_insert_reply(reply, NULL, 0, index, 0);
156         return;
157 }
158
159 typedef void (*out_reconstruct_t)(const struct lu_env *env,
160                                   struct dt_device *dt,
161                                   struct dt_object *obj,
162                                   struct update_reply *reply,
163                                   int index);
164
165 static inline int out_check_resent(const struct lu_env *env,
166                                    struct dt_device *dt,
167                                    struct dt_object *obj,
168                                    struct ptlrpc_request *req,
169                                    out_reconstruct_t reconstruct,
170                                    struct update_reply *reply,
171                                    int index)
172 {
173         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
174                 return 0;
175
176         if (req_xid_is_last(req)) {
177                 reconstruct(env, dt, obj, reply, index);
178                 return 1;
179         }
180         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
181                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
182         return 0;
183 }
184
185 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
186                            struct thandle *th)
187 {
188         int rc;
189
190         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
191                PFID(lu_object_fid(&dt_obj->do_lu)));
192
193         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
194         rc = dt_destroy(env, dt_obj, th);
195         dt_write_unlock(env, dt_obj);
196
197         return rc;
198 }
199
200 /**
201  * All of the xxx_undo will be used once execution failed,
202  * But because all of the required resource has been reserved in
203  * declare phase, i.e. if declare succeed, it should make sure
204  * the following executing phase succeed in anyway, so these undo
205  * should be useless for most of the time in Phase I
206  */
207 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
208                        struct tx_arg *arg)
209 {
210         int rc;
211
212         rc = out_obj_destroy(env, arg->object, th);
213         if (rc != 0)
214                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
215                        dt_obd_name(th->th_dev), rc);
216         return rc;
217 }
218
219 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
220                        struct tx_arg *arg)
221 {
222         struct dt_object        *dt_obj = arg->object;
223         int                      rc;
224
225         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
226                dt_obd_name(th->th_dev),
227                PFID(lu_object_fid(&arg->object->do_lu)),
228                arg->u.create.dof.dof_type,
229                arg->u.create.attr.la_mode & S_IFMT);
230
231         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
232         rc = dt_create(env, dt_obj, &arg->u.create.attr,
233                        &arg->u.create.hint, &arg->u.create.dof, th);
234
235         dt_write_unlock(env, dt_obj);
236
237         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
238                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
239
240         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
241
242         return rc;
243 }
244
245 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
246                            struct lu_attr *attr, struct lu_fid *parent_fid,
247                            struct dt_object_format *dof,
248                            struct thandle_exec_args *ta,
249                            struct update_reply *reply,
250                            int index, char *file, int line)
251 {
252         struct tx_arg *arg;
253
254         LASSERT(ta->ta_handle != NULL);
255         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
256                                        ta->ta_handle);
257         if (ta->ta_err != 0)
258                 return ta->ta_err;
259
260         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
261                           line);
262         LASSERT(arg);
263
264         /* release the object in out_trans_stop */
265         lu_object_get(&obj->do_lu);
266         arg->object = obj;
267         arg->u.create.attr = *attr;
268         if (parent_fid)
269                 arg->u.create.fid = *parent_fid;
270         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
271         arg->u.create.dof  = *dof;
272         arg->reply = reply;
273         arg->index = index;
274
275         return 0;
276 }
277
278 static int out_create(struct tgt_session_info *tsi)
279 {
280         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
281         struct update           *update = tti->tti_u.update.tti_update;
282         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
283         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
284         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
285         struct lu_attr          *attr = &tti->tti_attr;
286         struct lu_fid           *fid = NULL;
287         struct obdo             *wobdo;
288         int                     size;
289         int                     rc;
290
291         ENTRY;
292
293         wobdo = update_param_buf(update, 0, &size);
294         if (wobdo == NULL || size != sizeof(*wobdo)) {
295                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
296                        tgt_name(tsi->tsi_tgt), -EPROTO);
297                 RETURN(err_serious(-EPROTO));
298         }
299
300         obdo_le_to_cpu(wobdo, wobdo);
301         lustre_get_wire_obdo(NULL, lobdo, wobdo);
302         la_from_obdo(attr, lobdo, lobdo->o_valid);
303
304         dof->dof_type = dt_mode_to_dft(attr->la_mode);
305         if (S_ISDIR(attr->la_mode)) {
306                 int size;
307
308                 fid = update_param_buf(update, 1, &size);
309                 if (fid == NULL || size != sizeof(*fid)) {
310                         CERROR("%s: invalid fid: rc = %d\n",
311                                tgt_name(tsi->tsi_tgt), -EPROTO);
312                         RETURN(err_serious(-EPROTO));
313                 }
314                 fid_le_to_cpu(fid, fid);
315                 if (!fid_is_sane(fid)) {
316                         CERROR("%s: invalid fid "DFID": rc = %d\n",
317                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
318                         RETURN(err_serious(-EPROTO));
319                 }
320         }
321
322         if (lu_object_exists(&obj->do_lu))
323                 RETURN(-EEXIST);
324
325         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
326                            &tti->tti_tea,
327                            tti->tti_u.update.tti_update_reply,
328                            tti->tti_u.update.tti_update_reply_index);
329
330         RETURN(rc);
331 }
332
333 static int out_tx_attr_set_undo(const struct lu_env *env,
334                                 struct thandle *th, struct tx_arg *arg)
335 {
336         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
337                dt_obd_name(th->th_dev),
338                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
339
340         return -ENOTSUPP;
341 }
342
343 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
344                                 struct tx_arg *arg)
345 {
346         struct dt_object        *dt_obj = arg->object;
347         int                     rc;
348
349         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
350                PFID(lu_object_fid(&dt_obj->do_lu)));
351
352         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
353         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
354         dt_write_unlock(env, dt_obj);
355
356         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
357                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
358
359         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
360
361         return rc;
362 }
363
364 static int __out_tx_attr_set(const struct lu_env *env,
365                              struct dt_object *dt_obj,
366                              const struct lu_attr *attr,
367                              struct thandle_exec_args *th,
368                              struct update_reply *reply, int index,
369                              char *file, int line)
370 {
371         struct tx_arg           *arg;
372
373         LASSERT(th->ta_handle != NULL);
374         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
375         if (th->ta_err != 0)
376                 return th->ta_err;
377
378         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
379                           file, line);
380         LASSERT(arg);
381         lu_object_get(&dt_obj->do_lu);
382         arg->object = dt_obj;
383         arg->u.attr_set.attr = *attr;
384         arg->reply = reply;
385         arg->index = index;
386         return 0;
387 }
388
389 static int out_attr_set(struct tgt_session_info *tsi)
390 {
391         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
392         struct update           *update = tti->tti_u.update.tti_update;
393         struct lu_attr          *attr = &tti->tti_attr;
394         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
395         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
396         struct obdo             *wobdo;
397         int                      size;
398         int                      rc;
399
400         ENTRY;
401
402         wobdo = update_param_buf(update, 0, &size);
403         if (wobdo == NULL || size != sizeof(*wobdo)) {
404                 CERROR("%s: empty obdo in the update: rc = %d\n",
405                        tgt_name(tsi->tsi_tgt), -EPROTO);
406                 RETURN(err_serious(-EPROTO));
407         }
408
409         attr->la_valid = 0;
410         attr->la_valid = 0;
411         obdo_le_to_cpu(wobdo, wobdo);
412         lustre_get_wire_obdo(NULL, lobdo, wobdo);
413         la_from_obdo(attr, lobdo, lobdo->o_valid);
414
415         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
416                              tti->tti_u.update.tti_update_reply,
417                              tti->tti_u.update.tti_update_reply_index);
418
419         RETURN(rc);
420 }
421
422 static int out_attr_get(struct tgt_session_info *tsi)
423 {
424         const struct lu_env     *env = tsi->tsi_env;
425         struct tgt_thread_info  *tti = tgt_th_info(env);
426         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
427         struct lu_attr          *la = &tti->tti_attr;
428         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
429         int                     rc;
430
431         ENTRY;
432
433         if (!lu_object_exists(&obj->do_lu))
434                 RETURN(-ENOENT);
435
436         dt_read_lock(env, obj, MOR_TGT_CHILD);
437         rc = dt_attr_get(env, obj, la, NULL);
438         if (rc)
439                 GOTO(out_unlock, rc);
440         /*
441          * If it is a directory, we will also check whether the
442          * directory is empty.
443          * la_flags = 0 : Empty.
444          *          = 1 : Not empty.
445          */
446         la->la_flags = 0;
447         if (S_ISDIR(la->la_mode)) {
448                 struct dt_it            *it;
449                 const struct dt_it_ops  *iops;
450
451                 if (!dt_try_as_dir(env, obj))
452                         GOTO(out_unlock, rc = -ENOTDIR);
453
454                 iops = &obj->do_index_ops->dio_it;
455                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
456                 if (!IS_ERR(it)) {
457                         int  result;
458                         result = iops->get(env, it, (const void *)"");
459                         if (result > 0) {
460                                 int i;
461                                 for (result = 0, i = 0; result == 0 && i < 3;
462                                      ++i)
463                                         result = iops->next(env, it);
464                                 if (result == 0)
465                                         la->la_flags = 1;
466                         } else if (result == 0)
467                                 /*
468                                  * Huh? Index contains no zero key?
469                                  */
470                                 rc = -EIO;
471
472                         iops->put(env, it);
473                         iops->fini(env, it);
474                 }
475         }
476
477         obdo->o_valid = 0;
478         obdo_from_la(obdo, la, la->la_valid);
479         obdo_cpu_to_le(obdo, obdo);
480         lustre_set_wire_obdo(NULL, obdo, obdo);
481
482 out_unlock:
483         dt_read_unlock(env, obj);
484
485         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
486                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
487                0, rc);
488
489         update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
490                             sizeof(*obdo), 0, rc);
491         RETURN(rc);
492 }
493
494 static int out_xattr_get(struct tgt_session_info *tsi)
495 {
496         const struct lu_env     *env = tsi->tsi_env;
497         struct tgt_thread_info  *tti = tgt_th_info(env);
498         struct update           *update = tti->tti_u.update.tti_update;
499         struct lu_buf           *lbuf = &tti->tti_buf;
500         struct update_reply     *reply = tti->tti_u.update.tti_update_reply;
501         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
502         char                    *name;
503         void                    *ptr;
504         int                      rc;
505
506         ENTRY;
507
508         name = (char *)update_param_buf(update, 0, NULL);
509         if (name == NULL) {
510                 CERROR("%s: empty name for xattr get: rc = %d\n",
511                        tgt_name(tsi->tsi_tgt), -EPROTO);
512                 RETURN(err_serious(-EPROTO));
513         }
514
515         ptr = update_get_buf_internal(reply, 0, NULL);
516         LASSERT(ptr != NULL);
517
518         /* The first 4 bytes(int) are used to store the result */
519         lbuf->lb_buf = (char *)ptr + sizeof(int);
520         lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
521         dt_read_lock(env, obj, MOR_TGT_CHILD);
522         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
523         dt_read_unlock(env, obj);
524         if (rc < 0) {
525                 lbuf->lb_len = 0;
526                 GOTO(out, rc);
527         }
528         if (rc == 0) {
529                 lbuf->lb_len = 0;
530                 GOTO(out, rc = -ENOENT);
531         }
532         lbuf->lb_len = rc;
533         rc = 0;
534         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
535                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
536                name, (int)lbuf->lb_len);
537 out:
538         *(int *)ptr = rc;
539         reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
540         RETURN(rc);
541 }
542
543 static int out_index_lookup(struct tgt_session_info *tsi)
544 {
545         const struct lu_env     *env = tsi->tsi_env;
546         struct tgt_thread_info  *tti = tgt_th_info(env);
547         struct update           *update = tti->tti_u.update.tti_update;
548         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
549         char                    *name;
550         int                      rc;
551
552         ENTRY;
553
554         if (!lu_object_exists(&obj->do_lu))
555                 RETURN(-ENOENT);
556
557         name = (char *)update_param_buf(update, 0, NULL);
558         if (name == NULL) {
559                 CERROR("%s: empty name for lookup: rc = %d\n",
560                        tgt_name(tsi->tsi_tgt), -EPROTO);
561                 RETURN(err_serious(-EPROTO));
562         }
563
564         dt_read_lock(env, obj, MOR_TGT_CHILD);
565         if (!dt_try_as_dir(env, obj))
566                 GOTO(out_unlock, rc = -ENOTDIR);
567
568         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
569                 (struct dt_key *)name, NULL);
570
571         if (rc < 0)
572                 GOTO(out_unlock, rc);
573
574         if (rc == 0)
575                 rc += 1;
576
577         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
578                PFID(lu_object_fid(&obj->do_lu)), name,
579                PFID(&tti->tti_fid1), rc);
580         fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
581
582 out_unlock:
583         dt_read_unlock(env, obj);
584
585         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
586                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
587                0, rc);
588
589         update_insert_reply(tti->tti_u.update.tti_update_reply,
590                             &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
591         RETURN(rc);
592 }
593
594 static int out_tx_xattr_set_exec(const struct lu_env *env,
595                                  struct thandle *th,
596                                  struct tx_arg *arg)
597 {
598         struct dt_object *dt_obj = arg->object;
599         int rc;
600
601         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
602                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
603                arg->u.xattr_set.name, arg->u.xattr_set.flags);
604
605         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
606         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
607                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
608                           th, NULL);
609         dt_write_unlock(env, dt_obj);
610         /**
611          * Ignore errors if this is LINK EA
612          **/
613         if (unlikely(rc && !strncmp(arg->u.xattr_set.name, XATTR_NAME_LINK,
614                                     strlen(XATTR_NAME_LINK))))
615                 rc = 0;
616
617         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
618                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
619
620         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
621
622         return rc;
623 }
624
625 static int __out_tx_xattr_set(const struct lu_env *env,
626                               struct dt_object *dt_obj,
627                               const struct lu_buf *buf,
628                               const char *name, int flags,
629                               struct thandle_exec_args *ta,
630                               struct update_reply *reply, int index,
631                               char *file, int line)
632 {
633         struct tx_arg           *arg;
634
635         LASSERT(ta->ta_handle != NULL);
636         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
637                                           flags, ta->ta_handle);
638         if (ta->ta_err != 0)
639                 return ta->ta_err;
640
641         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
642         LASSERT(arg);
643         lu_object_get(&dt_obj->do_lu);
644         arg->object = dt_obj;
645         arg->u.xattr_set.name = name;
646         arg->u.xattr_set.flags = flags;
647         arg->u.xattr_set.buf = *buf;
648         arg->reply = reply;
649         arg->index = index;
650         arg->u.xattr_set.csum = 0;
651         return 0;
652 }
653
654 static int out_xattr_set(struct tgt_session_info *tsi)
655 {
656         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
657         struct update           *update = tti->tti_u.update.tti_update;
658         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
659         struct lu_buf           *lbuf = &tti->tti_buf;
660         char                    *name;
661         char                    *buf;
662         char                    *tmp;
663         int                      buf_len = 0;
664         int                      flag;
665         int                      rc;
666         ENTRY;
667
668         name = update_param_buf(update, 0, NULL);
669         if (name == NULL) {
670                 CERROR("%s: empty name for xattr set: rc = %d\n",
671                        tgt_name(tsi->tsi_tgt), -EPROTO);
672                 RETURN(err_serious(-EPROTO));
673         }
674
675         buf = (char *)update_param_buf(update, 1, &buf_len);
676         if (buf == NULL || buf_len == 0) {
677                 CERROR("%s: empty buf for xattr set: rc = %d\n",
678                        tgt_name(tsi->tsi_tgt), -EPROTO);
679                 RETURN(err_serious(-EPROTO));
680         }
681
682         lbuf->lb_buf = buf;
683         lbuf->lb_len = buf_len;
684
685         tmp = (char *)update_param_buf(update, 2, NULL);
686         if (tmp == NULL) {
687                 CERROR("%s: empty flag for xattr set: rc = %d\n",
688                        tgt_name(tsi->tsi_tgt), -EPROTO);
689                 RETURN(err_serious(-EPROTO));
690         }
691
692         flag = le32_to_cpu(*(int *)tmp);
693
694         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
695                               &tti->tti_tea,
696                               tti->tti_u.update.tti_update_reply,
697                               tti->tti_u.update.tti_update_reply_index);
698         RETURN(rc);
699 }
700
701 static int out_obj_ref_add(const struct lu_env *env,
702                            struct dt_object *dt_obj,
703                            struct thandle *th)
704 {
705         int rc;
706
707         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
708         rc = dt_ref_add(env, dt_obj, th);
709         dt_write_unlock(env, dt_obj);
710
711         return rc;
712 }
713
714 static int out_obj_ref_del(const struct lu_env *env,
715                            struct dt_object *dt_obj,
716                            struct thandle *th)
717 {
718         int rc;
719
720         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
721         rc = dt_ref_del(env, dt_obj, th);
722         dt_write_unlock(env, dt_obj);
723
724         return rc;
725 }
726
727 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
728                                struct tx_arg *arg)
729 {
730         struct dt_object *dt_obj = arg->object;
731         int rc;
732
733         rc = out_obj_ref_add(env, dt_obj, th);
734
735         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
736                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
737
738         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
739         return rc;
740 }
741
742 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
743                                struct tx_arg *arg)
744 {
745         return out_obj_ref_del(env, arg->object, th);
746 }
747
748 static int __out_tx_ref_add(const struct lu_env *env,
749                             struct dt_object *dt_obj,
750                             struct thandle_exec_args *ta,
751                             struct update_reply *reply,
752                             int index, char *file, int line)
753 {
754         struct tx_arg   *arg;
755
756         LASSERT(ta->ta_handle != NULL);
757         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
758         if (ta->ta_err != 0)
759                 return ta->ta_err;
760
761         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
762                           line);
763         LASSERT(arg);
764         lu_object_get(&dt_obj->do_lu);
765         arg->object = dt_obj;
766         arg->reply = reply;
767         arg->index = index;
768         return 0;
769 }
770
771 /**
772  * increase ref of the object
773  **/
774 static int out_ref_add(struct tgt_session_info *tsi)
775 {
776         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
777         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
778         int                      rc;
779
780         ENTRY;
781
782         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
783                             tti->tti_u.update.tti_update_reply,
784                             tti->tti_u.update.tti_update_reply_index);
785         RETURN(rc);
786 }
787
788 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
789                                struct tx_arg *arg)
790 {
791         struct dt_object        *dt_obj = arg->object;
792         int                      rc;
793
794         rc = out_obj_ref_del(env, dt_obj, th);
795
796         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
797                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
798
799         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
800
801         return rc;
802 }
803
804 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
805                                struct tx_arg *arg)
806 {
807         return out_obj_ref_add(env, arg->object, th);
808 }
809
810 static int __out_tx_ref_del(const struct lu_env *env,
811                             struct dt_object *dt_obj,
812                             struct thandle_exec_args *ta,
813                             struct update_reply *reply,
814                             int index, char *file, int line)
815 {
816         struct tx_arg   *arg;
817
818         LASSERT(ta->ta_handle != NULL);
819         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
820         if (ta->ta_err != 0)
821                 return ta->ta_err;
822
823         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
824                           line);
825         LASSERT(arg);
826         lu_object_get(&dt_obj->do_lu);
827         arg->object = dt_obj;
828         arg->reply = reply;
829         arg->index = index;
830         return 0;
831 }
832
833 static int out_ref_del(struct tgt_session_info *tsi)
834 {
835         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
836         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
837         int                      rc;
838
839         ENTRY;
840
841         if (!lu_object_exists(&obj->do_lu))
842                 RETURN(-ENOENT);
843
844         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
845                             tti->tti_u.update.tti_update_reply,
846                             tti->tti_u.update.tti_update_reply_index);
847         RETURN(rc);
848 }
849
850 static int out_obj_index_insert(const struct lu_env *env,
851                                 struct dt_object *dt_obj,
852                                 const struct dt_rec *rec,
853                                 const struct dt_key *key,
854                                 struct thandle *th)
855 {
856         int rc;
857
858         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
859                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
860                (char *)key, PFID((struct lu_fid *)rec));
861
862         if (dt_try_as_dir(env, dt_obj) == 0)
863                 return -ENOTDIR;
864
865         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
866         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
867         dt_write_unlock(env, dt_obj);
868
869         return rc;
870 }
871
872 static int out_obj_index_delete(const struct lu_env *env,
873                                 struct dt_object *dt_obj,
874                                 const struct dt_key *key,
875                                 struct thandle *th)
876 {
877         int rc;
878
879         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
880                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
881                (char *)key);
882
883         if (dt_try_as_dir(env, dt_obj) == 0)
884                 return -ENOTDIR;
885
886         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
887         rc = dt_delete(env, dt_obj, key, th, NULL);
888         dt_write_unlock(env, dt_obj);
889
890         return rc;
891 }
892
893 static int out_tx_index_insert_exec(const struct lu_env *env,
894                                     struct thandle *th, struct tx_arg *arg)
895 {
896         struct dt_object *dt_obj = arg->object;
897         int rc;
898
899         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
900                                   arg->u.insert.key, th);
901
902         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
903                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
904
905         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
906
907         return rc;
908 }
909
910 static int out_tx_index_insert_undo(const struct lu_env *env,
911                                     struct thandle *th, struct tx_arg *arg)
912 {
913         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
914 }
915
916 static int __out_tx_index_insert(const struct lu_env *env,
917                                  struct dt_object *dt_obj,
918                                  char *name, struct lu_fid *fid,
919                                  struct thandle_exec_args *ta,
920                                  struct update_reply *reply,
921                                  int index, char *file, int line)
922 {
923         struct tx_arg *arg;
924
925         LASSERT(ta->ta_handle != NULL);
926
927         if (lu_object_exists(&dt_obj->do_lu)) {
928                 if (dt_try_as_dir(env, dt_obj) == 0) {
929                         ta->ta_err = -ENOTDIR;
930                         return ta->ta_err;
931                 }
932                 ta->ta_err = dt_declare_insert(env, dt_obj,
933                                                (struct dt_rec *)fid,
934                                                (struct dt_key *)name,
935                                                ta->ta_handle);
936         }
937
938         if (ta->ta_err != 0)
939                 return ta->ta_err;
940
941         arg = tx_add_exec(ta, out_tx_index_insert_exec,
942                           out_tx_index_insert_undo, file,
943                           line);
944         LASSERT(arg);
945         lu_object_get(&dt_obj->do_lu);
946         arg->object = dt_obj;
947         arg->reply = reply;
948         arg->index = index;
949         arg->u.insert.rec = (struct dt_rec *)fid;
950         arg->u.insert.key = (struct dt_key *)name;
951
952         return 0;
953 }
954
955 static int out_index_insert(struct tgt_session_info *tsi)
956 {
957         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
958         struct update     *update = tti->tti_u.update.tti_update;
959         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
960         struct lu_fid     *fid;
961         char              *name;
962         int                rc = 0;
963         int                size;
964
965         ENTRY;
966
967         name = (char *)update_param_buf(update, 0, NULL);
968         if (name == NULL) {
969                 CERROR("%s: empty name for index insert: rc = %d\n",
970                        tgt_name(tsi->tsi_tgt), -EPROTO);
971                 RETURN(err_serious(-EPROTO));
972         }
973
974         fid = (struct lu_fid *)update_param_buf(update, 1, &size);
975         if (fid == NULL || size != sizeof(*fid)) {
976                 CERROR("%s: invalid fid: rc = %d\n",
977                        tgt_name(tsi->tsi_tgt), -EPROTO);
978                        RETURN(err_serious(-EPROTO));
979         }
980
981         fid_le_to_cpu(fid, fid);
982         if (!fid_is_sane(fid)) {
983                 CERROR("%s: invalid FID "DFID": rc = %d\n",
984                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
985                 RETURN(err_serious(-EPROTO));
986         }
987
988         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
989                                  &tti->tti_tea,
990                                  tti->tti_u.update.tti_update_reply,
991                                  tti->tti_u.update.tti_update_reply_index);
992         RETURN(rc);
993 }
994
995 static int out_tx_index_delete_exec(const struct lu_env *env,
996                                     struct thandle *th,
997                                     struct tx_arg *arg)
998 {
999         int rc;
1000
1001         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1002
1003         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1004                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1005
1006         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1007
1008         return rc;
1009 }
1010
1011 static int out_tx_index_delete_undo(const struct lu_env *env,
1012                                     struct thandle *th,
1013                                     struct tx_arg *arg)
1014 {
1015         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1016                dt_obd_name(th->th_dev), -ENOTSUPP);
1017         return -ENOTSUPP;
1018 }
1019
1020 static int __out_tx_index_delete(const struct lu_env *env,
1021                                  struct dt_object *dt_obj, char *name,
1022                                  struct thandle_exec_args *ta,
1023                                  struct update_reply *reply,
1024                                  int index, char *file, int line)
1025 {
1026         struct tx_arg *arg;
1027
1028         if (dt_try_as_dir(env, dt_obj) == 0) {
1029                 ta->ta_err = -ENOTDIR;
1030                 return ta->ta_err;
1031         }
1032
1033         LASSERT(ta->ta_handle != NULL);
1034         ta->ta_err = dt_declare_delete(env, dt_obj,
1035                                        (struct dt_key *)name,
1036                                        ta->ta_handle);
1037         if (ta->ta_err != 0)
1038                 return ta->ta_err;
1039
1040         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1041                           out_tx_index_delete_undo, file,
1042                           line);
1043         LASSERT(arg);
1044         lu_object_get(&dt_obj->do_lu);
1045         arg->object = dt_obj;
1046         arg->reply = reply;
1047         arg->index = index;
1048         arg->u.insert.key = (struct dt_key *)name;
1049         return 0;
1050 }
1051
1052 static int out_index_delete(struct tgt_session_info *tsi)
1053 {
1054         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1055         struct update           *update = tti->tti_u.update.tti_update;
1056         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1057         char                    *name;
1058         int                      rc = 0;
1059
1060         if (!lu_object_exists(&obj->do_lu))
1061                 RETURN(-ENOENT);
1062
1063         name = (char *)update_param_buf(update, 0, NULL);
1064         if (name == NULL) {
1065                 CERROR("%s: empty name for index delete: rc = %d\n",
1066                        tgt_name(tsi->tsi_tgt), -EPROTO);
1067                 RETURN(err_serious(-EPROTO));
1068         }
1069
1070         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1071                                  tti->tti_u.update.tti_update_reply,
1072                                  tti->tti_u.update.tti_update_reply_index);
1073         RETURN(rc);
1074 }
1075
1076 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1077                                struct tx_arg *arg)
1078 {
1079         struct dt_object *dt_obj = arg->object;
1080         int rc;
1081
1082         rc = out_obj_destroy(env, dt_obj, th);
1083
1084         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1085                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1086
1087         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1088
1089         RETURN(rc);
1090 }
1091
1092 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1093                                struct tx_arg *arg)
1094 {
1095         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1096                dt_obd_name(th->th_dev), -ENOTSUPP);
1097         return -ENOTSUPP;
1098 }
1099
1100 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1101                              struct thandle_exec_args *ta,
1102                              struct update_reply *reply,
1103                              int index, char *file, int line)
1104 {
1105         struct tx_arg *arg;
1106
1107         LASSERT(ta->ta_handle != NULL);
1108         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1109         if (ta->ta_err)
1110                 return ta->ta_err;
1111
1112         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1113                           file, line);
1114         LASSERT(arg);
1115         lu_object_get(&dt_obj->do_lu);
1116         arg->object = dt_obj;
1117         arg->reply = reply;
1118         arg->index = index;
1119         return 0;
1120 }
1121
1122 static int out_destroy(struct tgt_session_info *tsi)
1123 {
1124         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1125         struct update           *update = tti->tti_u.update.tti_update;
1126         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1127         struct lu_fid           *fid;
1128         int                      rc;
1129         ENTRY;
1130
1131         fid = &update->u_fid;
1132         fid_le_to_cpu(fid, fid);
1133         if (!fid_is_sane(fid)) {
1134                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1135                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1136                 RETURN(err_serious(-EPROTO));
1137         }
1138
1139         if (!lu_object_exists(&obj->do_lu))
1140                 RETURN(-ENOENT);
1141
1142         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1143                             tti->tti_u.update.tti_update_reply,
1144                             tti->tti_u.update.tti_update_reply_index);
1145
1146         RETURN(rc);
1147 }
1148
1149 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1150 [opc - OBJ_CREATE] = {                                  \
1151         .th_name    = name,                             \
1152         .th_fail_id = 0,                                \
1153         .th_opc     = opc,                              \
1154         .th_flags   = flags,                            \
1155         .th_act     = fn,                               \
1156         .th_fmt     = NULL,                             \
1157         .th_version = 0,                                \
1158 }
1159
1160 #define out_handler mdt_handler
1161 static struct tgt_handler out_update_ops[] = {
1162         DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
1163                      out_create),
1164         DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
1165                      out_destroy),
1166         DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
1167                      out_ref_add),
1168         DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
1169                      out_ref_del),
1170         DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set",  MUTABOR | HABEO_REFERO,
1171                      out_attr_set),
1172         DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get",  HABEO_REFERO,
1173                      out_attr_get),
1174         DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
1175                      out_xattr_set),
1176         DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
1177                      out_xattr_get),
1178         DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
1179                      out_index_lookup),
1180         DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
1181                      MUTABOR | HABEO_REFERO, out_index_insert),
1182         DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
1183                      MUTABOR | HABEO_REFERO, out_index_delete),
1184 };
1185
1186 struct tgt_handler *out_handler_find(__u32 opc)
1187 {
1188         struct tgt_handler *h;
1189
1190         h = NULL;
1191         if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
1192                 h = &out_update_ops[opc - OBJ_CREATE];
1193                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1194                          h->th_opc, opc);
1195         } else {
1196                 h = NULL; /* unsupported opc */
1197         }
1198         return h;
1199 }
1200
1201 /**
1202  * Object updates between Targets. Because all the updates has been
1203  * dis-assemblied into object updates at sender side, so OUT will
1204  * call OSD API directly to execute these updates.
1205  *
1206  * In DNE phase I all of the updates in the request need to be executed
1207  * in one transaction, and the transaction has to be synchronously.
1208  *
1209  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1210  * format.
1211  */
1212 int out_handle(struct tgt_session_info *tsi)
1213 {
1214         const struct lu_env             *env = tsi->tsi_env;
1215         struct tgt_thread_info          *tti = tgt_th_info(env);
1216         struct thandle_exec_args        *ta = &tti->tti_tea;
1217         struct req_capsule              *pill = tsi->tsi_pill;
1218         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1219         struct update_buf               *ubuf;
1220         struct update                   *update;
1221         struct update_reply             *update_reply;
1222         int                              bufsize;
1223         int                              count;
1224         int                              old_batchid = -1;
1225         unsigned                         off;
1226         int                              i;
1227         int                              rc = 0;
1228         int                              rc1 = 0;
1229
1230         ENTRY;
1231
1232         req_capsule_set(pill, &RQF_UPDATE_OBJ);
1233         bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1234         if (bufsize != UPDATE_BUFFER_SIZE) {
1235                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1236                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1237                 RETURN(err_serious(-EPROTO));
1238         }
1239
1240         ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1241         if (ubuf == NULL) {
1242                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1243                        -EPROTO);
1244                 RETURN(err_serious(-EPROTO));
1245         }
1246
1247         if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
1248                 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1249                        tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
1250                        UPDATE_BUFFER_MAGIC, -EPROTO);
1251                 RETURN(err_serious(-EPROTO));
1252         }
1253
1254         count = le32_to_cpu(ubuf->ub_count);
1255         if (count <= 0) {
1256                 CERROR("%s: No update!: rc = %d\n",
1257                        tgt_name(tsi->tsi_tgt), -EPROTO);
1258                 RETURN(err_serious(-EPROTO));
1259         }
1260
1261         req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1262                              UPDATE_BUFFER_SIZE);
1263         rc = req_capsule_server_pack(pill);
1264         if (rc != 0) {
1265                 CERROR("%s: Can't pack response: rc = %d\n",
1266                        tgt_name(tsi->tsi_tgt), rc);
1267                 RETURN(rc);
1268         }
1269
1270         /* Prepare the update reply buffer */
1271         update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1272         update_init_reply_buf(update_reply, count);
1273         tti->tti_u.update.tti_update_reply = update_reply;
1274
1275         rc = out_tx_start(env, dt, ta);
1276         if (rc != 0)
1277                 RETURN(rc);
1278
1279         /* Walk through updates in the request to execute them synchronously */
1280         off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1281         for (i = 0; i < count; i++) {
1282                 struct tgt_handler      *h;
1283                 struct dt_object        *dt_obj;
1284
1285                 update = (struct update *)((char *)ubuf + off);
1286                 if (old_batchid == -1) {
1287                         old_batchid = update->u_batchid;
1288                 } else if (old_batchid != update->u_batchid) {
1289                         /* Stop the current update transaction,
1290                          * create a new one */
1291                         rc = out_tx_end(env, ta);
1292                         if (rc != 0)
1293                                 RETURN(rc);
1294
1295                         rc = out_tx_start(env, dt, ta);
1296                         if (rc != 0)
1297                                 RETURN(rc);
1298                         old_batchid = update->u_batchid;
1299                 }
1300
1301                 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1302                 if (!fid_is_sane(&update->u_fid)) {
1303                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1304                                tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
1305                                -EPROTO);
1306                         GOTO(out, rc = err_serious(-EPROTO));
1307                 }
1308
1309                 dt_obj = dt_locate(env, dt, &update->u_fid);
1310                 if (IS_ERR(dt_obj))
1311                         GOTO(out, rc = PTR_ERR(dt_obj));
1312
1313                 tti->tti_u.update.tti_dt_object = dt_obj;
1314                 tti->tti_u.update.tti_update = update;
1315                 tti->tti_u.update.tti_update_reply_index = i;
1316
1317                 h = out_handler_find(update->u_type);
1318                 if (likely(h != NULL)) {
1319                         /* For real modification RPC, check if the update
1320                          * has been executed */
1321                         if (h->th_flags & MUTABOR) {
1322                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1323
1324                                 if (out_check_resent(env, dt, dt_obj, req,
1325                                                      out_reconstruct,
1326                                                      update_reply, i))
1327                                         GOTO(next, rc);
1328                         }
1329
1330                         rc = h->th_act(tsi);
1331                 } else {
1332                         CERROR("%s: The unsupported opc: 0x%x\n",
1333                                tgt_name(tsi->tsi_tgt), update->u_type);
1334                         lu_object_put(env, &dt_obj->do_lu);
1335                         GOTO(out, rc = -ENOTSUPP);
1336                 }
1337 next:
1338                 lu_object_put(env, &dt_obj->do_lu);
1339                 if (rc < 0)
1340                         GOTO(out, rc);
1341                 off += cfs_size_round(update_size(update));
1342         }
1343 out:
1344         rc1 = out_tx_end(env, ta);
1345         if (rc == 0)
1346                 rc = rc1;
1347         RETURN(rc);
1348 }
1349
1350 struct tgt_handler tgt_out_handlers[] = {
1351 TGT_UPDATE_HDL(MUTABOR, UPDATE_OBJ,     out_handle),
1352 };
1353 EXPORT_SYMBOL(tgt_out_handlers);
1354