Whamcloud - gitweb
LU-3539 protocol: Change UPDATE_OBJ RPC format
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48                  "Too many updates(%d) in one trans\n", ta->ta_argno);
49         i = ta->ta_argno;
50
51         ta->ta_argno++;
52
53         ta->ta_args[i].exec_fn = func;
54         ta->ta_args[i].undo_fn = undo;
55         ta->ta_args[i].file    = file;
56         ta->ta_args[i].line    = line;
57
58         return &ta->ta_args[i];
59 }
60
61 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
62                         struct thandle_exec_args *ta, struct obd_export *exp)
63 {
64         memset(ta, 0, sizeof(*ta));
65         ta->ta_handle = dt_trans_create(env, dt);
66         if (IS_ERR(ta->ta_handle)) {
67                 int rc;
68
69                 CERROR("%s: start handle error: rc = %ld\n",
70                        dt_obd_name(dt), PTR_ERR(ta->ta_handle));
71                 rc = PTR_ERR(ta->ta_handle);
72                 ta->ta_handle = NULL;
73                 return rc;
74         }
75         ta->ta_dev = dt;
76         if (exp->exp_need_sync)
77                 ta->ta_handle->th_sync = 1;
78
79         return 0;
80 }
81
82 static int out_trans_start(const struct lu_env *env,
83                            struct thandle_exec_args *ta)
84 {
85         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
86 }
87
88 static int out_trans_stop(const struct lu_env *env,
89                           struct thandle_exec_args *ta, int err)
90 {
91         int i;
92         int rc;
93
94         ta->ta_handle->th_result = err;
95         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
96         for (i = 0; i < ta->ta_argno; i++) {
97                 if (ta->ta_args[i].object != NULL) {
98                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
99                         ta->ta_args[i].object = NULL;
100                 }
101         }
102
103         return rc;
104 }
105
106 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
107 {
108         struct tgt_session_info *tsi = tgt_ses_info(env);
109         int i = 0, rc;
110
111         LASSERT(ta->ta_dev);
112         LASSERT(ta->ta_handle);
113
114         if (ta->ta_err != 0 || ta->ta_argno == 0)
115                 GOTO(stop, rc = ta->ta_err);
116
117         rc = out_trans_start(env, ta);
118         if (unlikely(rc))
119                 GOTO(stop, rc);
120
121         for (i = 0; i < ta->ta_argno; i++) {
122                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
123                                             &ta->ta_args[i]);
124                 if (unlikely(rc)) {
125                         CDEBUG(D_INFO, "error during execution of #%u from"
126                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
127                                ta->ta_args[i].line, rc);
128                         while (--i >= 0) {
129                                 LASSERTF(ta->ta_args[i].undo_fn != NULL,
130                                     "can't undo changes, hope for failover!\n");
131                                 ta->ta_args[i].undo_fn(env, ta->ta_handle,
132                                                        &ta->ta_args[i]);
133                         }
134                         break;
135                 }
136         }
137
138         /* Only fail for real update */
139         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
140 stop:
141         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
142                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
143         out_trans_stop(env, ta, rc);
144         ta->ta_handle = NULL;
145         ta->ta_argno = 0;
146         ta->ta_err = 0;
147
148         RETURN(rc);
149 }
150
151 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
152                             struct dt_object *obj,
153                             struct object_update_reply *reply,
154                             int index)
155 {
156         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
157                dt_obd_name(dt), reply, index, 0);
158
159         object_update_result_insert(reply, NULL, 0, index, 0);
160         return;
161 }
162
163 typedef void (*out_reconstruct_t)(const struct lu_env *env,
164                                   struct dt_device *dt,
165                                   struct dt_object *obj,
166                                   struct object_update_reply *reply,
167                                   int index);
168
169 static inline int out_check_resent(const struct lu_env *env,
170                                    struct dt_device *dt,
171                                    struct dt_object *obj,
172                                    struct ptlrpc_request *req,
173                                    out_reconstruct_t reconstruct,
174                                    struct object_update_reply *reply,
175                                    int index)
176 {
177         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
178                 return 0;
179
180         if (req_xid_is_last(req)) {
181                 reconstruct(env, dt, obj, reply, index);
182                 return 1;
183         }
184         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
185                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
186         return 0;
187 }
188
189 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
190                            struct thandle *th)
191 {
192         int rc;
193
194         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
195                PFID(lu_object_fid(&dt_obj->do_lu)));
196
197         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
198         rc = dt_destroy(env, dt_obj, th);
199         dt_write_unlock(env, dt_obj);
200
201         return rc;
202 }
203
204 /**
205  * All of the xxx_undo will be used once execution failed,
206  * But because all of the required resource has been reserved in
207  * declare phase, i.e. if declare succeed, it should make sure
208  * the following executing phase succeed in anyway, so these undo
209  * should be useless for most of the time in Phase I
210  */
211 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
212                        struct tx_arg *arg)
213 {
214         int rc;
215
216         rc = out_obj_destroy(env, arg->object, th);
217         if (rc != 0)
218                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
219                        dt_obd_name(th->th_dev), rc);
220         return rc;
221 }
222
223 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
224                        struct tx_arg *arg)
225 {
226         struct dt_object        *dt_obj = arg->object;
227         int                      rc;
228
229         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
230                dt_obd_name(th->th_dev),
231                PFID(lu_object_fid(&arg->object->do_lu)),
232                arg->u.create.dof.dof_type,
233                arg->u.create.attr.la_mode & S_IFMT);
234
235         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
236         rc = dt_create(env, dt_obj, &arg->u.create.attr,
237                        &arg->u.create.hint, &arg->u.create.dof, th);
238
239         dt_write_unlock(env, dt_obj);
240
241         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
242                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
243
244         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
245
246         return rc;
247 }
248
249 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
250                            struct lu_attr *attr, struct lu_fid *parent_fid,
251                            struct dt_object_format *dof,
252                            struct thandle_exec_args *ta,
253                            struct object_update_reply *reply,
254                            int index, char *file, int line)
255 {
256         struct tx_arg *arg;
257
258         LASSERT(ta->ta_handle != NULL);
259         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
260                                        ta->ta_handle);
261         if (ta->ta_err != 0)
262                 return ta->ta_err;
263
264         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
265                           line);
266         LASSERT(arg);
267
268         /* release the object in out_trans_stop */
269         lu_object_get(&obj->do_lu);
270         arg->object = obj;
271         arg->u.create.attr = *attr;
272         if (parent_fid != NULL)
273                 arg->u.create.fid = *parent_fid;
274         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
275         arg->u.create.dof  = *dof;
276         arg->reply = reply;
277         arg->index = index;
278
279         return 0;
280 }
281
282 static int out_create(struct tgt_session_info *tsi)
283 {
284         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
285         struct object_update    *update = tti->tti_u.update.tti_update;
286         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
287         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
288         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
289         struct lu_attr          *attr = &tti->tti_attr;
290         struct lu_fid           *fid = NULL;
291         struct obdo             *wobdo;
292         int                     size;
293         int                     rc;
294
295         ENTRY;
296
297         wobdo = object_update_param_get(update, 0, &size);
298         if (wobdo == NULL || size != sizeof(*wobdo)) {
299                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
300                        tgt_name(tsi->tsi_tgt), -EPROTO);
301                 RETURN(err_serious(-EPROTO));
302         }
303
304         obdo_le_to_cpu(wobdo, wobdo);
305         lustre_get_wire_obdo(NULL, lobdo, wobdo);
306         la_from_obdo(attr, lobdo, lobdo->o_valid);
307
308         dof->dof_type = dt_mode_to_dft(attr->la_mode);
309         if (update->ou_params_count > 1) {
310                 int size;
311
312                 fid = object_update_param_get(update, 1, &size);
313                 if (fid == NULL || size != sizeof(*fid)) {
314                         CERROR("%s: invalid fid: rc = %d\n",
315                                tgt_name(tsi->tsi_tgt), -EPROTO);
316                         RETURN(err_serious(-EPROTO));
317                 }
318                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
319                         lustre_swab_lu_fid(fid);
320                 if (!fid_is_sane(fid)) {
321                         CERROR("%s: invalid fid "DFID": rc = %d\n",
322                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
323                         RETURN(err_serious(-EPROTO));
324                 }
325         }
326
327         if (lu_object_exists(&obj->do_lu))
328                 RETURN(-EEXIST);
329
330         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
331                            &tti->tti_tea,
332                            tti->tti_u.update.tti_update_reply,
333                            tti->tti_u.update.tti_update_reply_index);
334
335         RETURN(rc);
336 }
337
338 static int out_tx_attr_set_undo(const struct lu_env *env,
339                                 struct thandle *th, struct tx_arg *arg)
340 {
341         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
342                dt_obd_name(th->th_dev),
343                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
344
345         return -ENOTSUPP;
346 }
347
348 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
349                                 struct tx_arg *arg)
350 {
351         struct dt_object        *dt_obj = arg->object;
352         int                     rc;
353
354         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
355                PFID(lu_object_fid(&dt_obj->do_lu)));
356
357         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
358         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
359         dt_write_unlock(env, dt_obj);
360
361         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
362                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
363
364         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
365
366         return rc;
367 }
368
369 static int __out_tx_attr_set(const struct lu_env *env,
370                              struct dt_object *dt_obj,
371                              const struct lu_attr *attr,
372                              struct thandle_exec_args *th,
373                              struct object_update_reply *reply,
374                              int index, char *file, int line)
375 {
376         struct tx_arg           *arg;
377
378         LASSERT(th->ta_handle != NULL);
379         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
380         if (th->ta_err != 0)
381                 return th->ta_err;
382
383         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
384                           file, line);
385         LASSERT(arg);
386         lu_object_get(&dt_obj->do_lu);
387         arg->object = dt_obj;
388         arg->u.attr_set.attr = *attr;
389         arg->reply = reply;
390         arg->index = index;
391         return 0;
392 }
393
394 static int out_attr_set(struct tgt_session_info *tsi)
395 {
396         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
397         struct object_update    *update = tti->tti_u.update.tti_update;
398         struct lu_attr          *attr = &tti->tti_attr;
399         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
400         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
401         struct obdo             *wobdo;
402         int                      size;
403         int                      rc;
404
405         ENTRY;
406
407         wobdo = object_update_param_get(update, 0, &size);
408         if (wobdo == NULL || size != sizeof(*wobdo)) {
409                 CERROR("%s: empty obdo in the update: rc = %d\n",
410                        tgt_name(tsi->tsi_tgt), -EPROTO);
411                 RETURN(err_serious(-EPROTO));
412         }
413
414         attr->la_valid = 0;
415         attr->la_valid = 0;
416         obdo_le_to_cpu(wobdo, wobdo);
417         lustre_get_wire_obdo(NULL, lobdo, wobdo);
418         la_from_obdo(attr, lobdo, lobdo->o_valid);
419
420         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
421                              tti->tti_u.update.tti_update_reply,
422                              tti->tti_u.update.tti_update_reply_index);
423
424         RETURN(rc);
425 }
426
427 static int out_attr_get(struct tgt_session_info *tsi)
428 {
429         const struct lu_env     *env = tsi->tsi_env;
430         struct tgt_thread_info  *tti = tgt_th_info(env);
431         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
432         struct lu_attr          *la = &tti->tti_attr;
433         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
434         int                     idx = tti->tti_u.update.tti_update_reply_index;
435         int                     rc;
436
437         ENTRY;
438
439         if (!lu_object_exists(&obj->do_lu))
440                 RETURN(-ENOENT);
441
442         dt_read_lock(env, obj, MOR_TGT_CHILD);
443         rc = dt_attr_get(env, obj, la, NULL);
444         if (rc)
445                 GOTO(out_unlock, rc);
446         /*
447          * If it is a directory, we will also check whether the
448          * directory is empty.
449          * la_flags = 0 : Empty.
450          *          = 1 : Not empty.
451          */
452         la->la_flags = 0;
453         if (S_ISDIR(la->la_mode)) {
454                 struct dt_it            *it;
455                 const struct dt_it_ops  *iops;
456
457                 if (!dt_try_as_dir(env, obj))
458                         GOTO(out_unlock, rc = -ENOTDIR);
459
460                 iops = &obj->do_index_ops->dio_it;
461                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
462                 if (!IS_ERR(it)) {
463                         int  result;
464                         result = iops->get(env, it, (const void *)"");
465                         if (result > 0) {
466                                 int i;
467                                 for (result = 0, i = 0; result == 0 && i < 3;
468                                      ++i)
469                                         result = iops->next(env, it);
470                                 if (result == 0)
471                                         la->la_flags = 1;
472                         } else if (result == 0)
473                                 /*
474                                  * Huh? Index contains no zero key?
475                                  */
476                                 rc = -EIO;
477
478                         iops->put(env, it);
479                         iops->fini(env, it);
480                 }
481         }
482
483         obdo->o_valid = 0;
484         obdo_from_la(obdo, la, la->la_valid);
485         obdo_cpu_to_le(obdo, obdo);
486         lustre_set_wire_obdo(NULL, obdo, obdo);
487
488 out_unlock:
489         dt_read_unlock(env, obj);
490
491         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
492                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
493                0, rc);
494
495         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
496                                     sizeof(*obdo), idx, rc);
497
498         RETURN(rc);
499 }
500
501 static int out_xattr_get(struct tgt_session_info *tsi)
502 {
503         const struct lu_env        *env = tsi->tsi_env;
504         struct tgt_thread_info     *tti = tgt_th_info(env);
505         struct object_update       *update = tti->tti_u.update.tti_update;
506         struct lu_buf              *lbuf = &tti->tti_buf;
507         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
508         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
509         char                       *name;
510         struct object_update_result *update_result;
511         int                     idx = tti->tti_u.update.tti_update_reply_index;
512         int                        rc;
513
514         ENTRY;
515
516         name = object_update_param_get(update, 0, NULL);
517         if (name == NULL) {
518                 CERROR("%s: empty name for xattr get: rc = %d\n",
519                        tgt_name(tsi->tsi_tgt), -EPROTO);
520                 RETURN(err_serious(-EPROTO));
521         }
522
523         update_result = object_update_result_get(reply, 0, NULL);
524         if (update_result == NULL) {
525                 CERROR("%s: empty name for xattr get: rc = %d\n",
526                        tgt_name(tsi->tsi_tgt), -EPROTO);
527                 RETURN(err_serious(-EPROTO));
528         }
529
530         lbuf->lb_buf = update_result->our_data;
531         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
532                        cfs_size_round((unsigned long)update_result->our_data -
533                                       (unsigned long)update_result);
534         dt_read_lock(env, obj, MOR_TGT_CHILD);
535         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
536         dt_read_unlock(env, obj);
537         if (rc < 0) {
538                 lbuf->lb_len = 0;
539                 GOTO(out, rc);
540         }
541         if (rc == 0) {
542                 lbuf->lb_len = 0;
543                 GOTO(out, rc = -ENOENT);
544         }
545         lbuf->lb_len = rc;
546         rc = 0;
547         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
548                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
549                name, (int)lbuf->lb_len);
550
551         GOTO(out, rc);
552
553 out:
554         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
555         RETURN(rc);
556 }
557
558 static int out_index_lookup(struct tgt_session_info *tsi)
559 {
560         const struct lu_env     *env = tsi->tsi_env;
561         struct tgt_thread_info  *tti = tgt_th_info(env);
562         struct object_update    *update = tti->tti_u.update.tti_update;
563         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
564         char                    *name;
565         int                      rc;
566
567         ENTRY;
568
569         if (!lu_object_exists(&obj->do_lu))
570                 RETURN(-ENOENT);
571
572         name = object_update_param_get(update, 0, NULL);
573         if (name == NULL) {
574                 CERROR("%s: empty name for lookup: rc = %d\n",
575                        tgt_name(tsi->tsi_tgt), -EPROTO);
576                 RETURN(err_serious(-EPROTO));
577         }
578
579         dt_read_lock(env, obj, MOR_TGT_CHILD);
580         if (!dt_try_as_dir(env, obj))
581                 GOTO(out_unlock, rc = -ENOTDIR);
582
583         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
584                 (struct dt_key *)name, NULL);
585
586         if (rc < 0)
587                 GOTO(out_unlock, rc);
588
589         if (rc == 0)
590                 rc += 1;
591
592         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
593                PFID(lu_object_fid(&obj->do_lu)), name,
594                PFID(&tti->tti_fid1), rc);
595
596 out_unlock:
597         dt_read_unlock(env, obj);
598
599         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
600                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
601                0, rc);
602
603         object_update_result_insert(tti->tti_u.update.tti_update_reply,
604                             &tti->tti_fid1, sizeof(tti->tti_fid1),
605                             tti->tti_u.update.tti_update_reply_index, rc);
606         RETURN(rc);
607 }
608
609 static int out_tx_xattr_set_exec(const struct lu_env *env,
610                                  struct thandle *th,
611                                  struct tx_arg *arg)
612 {
613         struct dt_object *dt_obj = arg->object;
614         int rc;
615
616         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
617                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
618                arg->u.xattr_set.name, arg->u.xattr_set.flags);
619
620         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
621         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
622                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
623                           th, NULL);
624         dt_write_unlock(env, dt_obj);
625         /**
626          * Ignore errors if this is LINK EA
627          **/
628         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
629                 rc = 0;
630
631         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
632                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
633
634         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
635
636         return rc;
637 }
638
639 static int __out_tx_xattr_set(const struct lu_env *env,
640                               struct dt_object *dt_obj,
641                               const struct lu_buf *buf,
642                               const char *name, int flags,
643                               struct thandle_exec_args *ta,
644                               struct object_update_reply *reply,
645                               int index, char *file, int line)
646 {
647         struct tx_arg           *arg;
648
649         LASSERT(ta->ta_handle != NULL);
650         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
651                                           flags, ta->ta_handle);
652         if (ta->ta_err != 0)
653                 return ta->ta_err;
654
655         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
656         LASSERT(arg);
657         lu_object_get(&dt_obj->do_lu);
658         arg->object = dt_obj;
659         arg->u.xattr_set.name = name;
660         arg->u.xattr_set.flags = flags;
661         arg->u.xattr_set.buf = *buf;
662         arg->reply = reply;
663         arg->index = index;
664         arg->u.xattr_set.csum = 0;
665         return 0;
666 }
667
668 static int out_xattr_set(struct tgt_session_info *tsi)
669 {
670         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
671         struct object_update    *update = tti->tti_u.update.tti_update;
672         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
673         struct lu_buf           *lbuf = &tti->tti_buf;
674         char                    *name;
675         char                    *buf;
676         char                    *tmp;
677         int                      buf_len = 0;
678         int                      flag;
679         int                      rc;
680         ENTRY;
681
682         name = object_update_param_get(update, 0, NULL);
683         if (name == NULL) {
684                 CERROR("%s: empty name for xattr set: rc = %d\n",
685                        tgt_name(tsi->tsi_tgt), -EPROTO);
686                 RETURN(err_serious(-EPROTO));
687         }
688
689         buf = object_update_param_get(update, 1, &buf_len);
690         if (buf == NULL || buf_len == 0) {
691                 CERROR("%s: empty buf for xattr set: rc = %d\n",
692                        tgt_name(tsi->tsi_tgt), -EPROTO);
693                 RETURN(err_serious(-EPROTO));
694         }
695
696         lbuf->lb_buf = buf;
697         lbuf->lb_len = buf_len;
698
699         tmp = (char *)object_update_param_get(update, 2, NULL);
700         if (tmp == NULL) {
701                 CERROR("%s: empty flag for xattr set: rc = %d\n",
702                        tgt_name(tsi->tsi_tgt), -EPROTO);
703                 RETURN(err_serious(-EPROTO));
704         }
705
706         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
707                 __swab32s((__u32 *)tmp);
708         flag = *(int *)tmp;
709
710         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
711                               &tti->tti_tea,
712                               tti->tti_u.update.tti_update_reply,
713                               tti->tti_u.update.tti_update_reply_index);
714         RETURN(rc);
715 }
716
717 static int out_obj_ref_add(const struct lu_env *env,
718                            struct dt_object *dt_obj,
719                            struct thandle *th)
720 {
721         int rc;
722
723         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
724         rc = dt_ref_add(env, dt_obj, th);
725         dt_write_unlock(env, dt_obj);
726
727         return rc;
728 }
729
730 static int out_obj_ref_del(const struct lu_env *env,
731                            struct dt_object *dt_obj,
732                            struct thandle *th)
733 {
734         int rc;
735
736         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
737         rc = dt_ref_del(env, dt_obj, th);
738         dt_write_unlock(env, dt_obj);
739
740         return rc;
741 }
742
743 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
744                                struct tx_arg *arg)
745 {
746         struct dt_object *dt_obj = arg->object;
747         int rc;
748
749         rc = out_obj_ref_add(env, dt_obj, th);
750
751         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
752                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
753
754         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
755         return rc;
756 }
757
758 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
759                                struct tx_arg *arg)
760 {
761         return out_obj_ref_del(env, arg->object, th);
762 }
763
764 static int __out_tx_ref_add(const struct lu_env *env,
765                             struct dt_object *dt_obj,
766                             struct thandle_exec_args *ta,
767                             struct object_update_reply *reply,
768                             int index, char *file, int line)
769 {
770         struct tx_arg   *arg;
771
772         LASSERT(ta->ta_handle != NULL);
773         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
774         if (ta->ta_err != 0)
775                 return ta->ta_err;
776
777         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
778                           line);
779         LASSERT(arg);
780         lu_object_get(&dt_obj->do_lu);
781         arg->object = dt_obj;
782         arg->reply = reply;
783         arg->index = index;
784         return 0;
785 }
786
787 /**
788  * increase ref of the object
789  **/
790 static int out_ref_add(struct tgt_session_info *tsi)
791 {
792         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
793         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
794         int                      rc;
795
796         ENTRY;
797
798         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
799                             tti->tti_u.update.tti_update_reply,
800                             tti->tti_u.update.tti_update_reply_index);
801         RETURN(rc);
802 }
803
804 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
805                                struct tx_arg *arg)
806 {
807         struct dt_object        *dt_obj = arg->object;
808         int                      rc;
809
810         rc = out_obj_ref_del(env, dt_obj, th);
811
812         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
813                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
814
815         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
816
817         return rc;
818 }
819
820 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
821                                struct tx_arg *arg)
822 {
823         return out_obj_ref_add(env, arg->object, th);
824 }
825
826 static int __out_tx_ref_del(const struct lu_env *env,
827                             struct dt_object *dt_obj,
828                             struct thandle_exec_args *ta,
829                             struct object_update_reply *reply,
830                             int index, char *file, int line)
831 {
832         struct tx_arg   *arg;
833
834         LASSERT(ta->ta_handle != NULL);
835         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
836         if (ta->ta_err != 0)
837                 return ta->ta_err;
838
839         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
840                           line);
841         LASSERT(arg);
842         lu_object_get(&dt_obj->do_lu);
843         arg->object = dt_obj;
844         arg->reply = reply;
845         arg->index = index;
846         return 0;
847 }
848
849 static int out_ref_del(struct tgt_session_info *tsi)
850 {
851         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
852         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
853         int                      rc;
854
855         ENTRY;
856
857         if (!lu_object_exists(&obj->do_lu))
858                 RETURN(-ENOENT);
859
860         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
861                             tti->tti_u.update.tti_update_reply,
862                             tti->tti_u.update.tti_update_reply_index);
863         RETURN(rc);
864 }
865
866 static int out_obj_index_insert(const struct lu_env *env,
867                                 struct dt_object *dt_obj,
868                                 const struct dt_rec *rec,
869                                 const struct dt_key *key,
870                                 struct thandle *th)
871 {
872         int rc;
873
874         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
875                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
876                (char *)key, PFID((struct lu_fid *)rec));
877
878         if (dt_try_as_dir(env, dt_obj) == 0)
879                 return -ENOTDIR;
880
881         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
882         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
883         dt_write_unlock(env, dt_obj);
884
885         return rc;
886 }
887
888 static int out_obj_index_delete(const struct lu_env *env,
889                                 struct dt_object *dt_obj,
890                                 const struct dt_key *key,
891                                 struct thandle *th)
892 {
893         int rc;
894
895         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
896                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
897                (char *)key);
898
899         if (dt_try_as_dir(env, dt_obj) == 0)
900                 return -ENOTDIR;
901
902         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
903         rc = dt_delete(env, dt_obj, key, th, NULL);
904         dt_write_unlock(env, dt_obj);
905
906         return rc;
907 }
908
909 static int out_tx_index_insert_exec(const struct lu_env *env,
910                                     struct thandle *th, struct tx_arg *arg)
911 {
912         struct dt_object *dt_obj = arg->object;
913         int rc;
914
915         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
916                                   arg->u.insert.key, th);
917
918         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
919                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
920
921         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
922
923         return rc;
924 }
925
926 static int out_tx_index_insert_undo(const struct lu_env *env,
927                                     struct thandle *th, struct tx_arg *arg)
928 {
929         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
930 }
931
932 static int __out_tx_index_insert(const struct lu_env *env,
933                                  struct dt_object *dt_obj,
934                                  char *name, struct lu_fid *fid,
935                                  struct thandle_exec_args *ta,
936                                  struct object_update_reply *reply,
937                                  int index, char *file, int line)
938 {
939         struct tx_arg *arg;
940
941         LASSERT(ta->ta_handle != NULL);
942
943         if (lu_object_exists(&dt_obj->do_lu)) {
944                 if (dt_try_as_dir(env, dt_obj) == 0) {
945                         ta->ta_err = -ENOTDIR;
946                         return ta->ta_err;
947                 }
948                 ta->ta_err = dt_declare_insert(env, dt_obj,
949                                                (struct dt_rec *)fid,
950                                                (struct dt_key *)name,
951                                                ta->ta_handle);
952         }
953
954         if (ta->ta_err != 0)
955                 return ta->ta_err;
956
957         arg = tx_add_exec(ta, out_tx_index_insert_exec,
958                           out_tx_index_insert_undo, file,
959                           line);
960         LASSERT(arg);
961         lu_object_get(&dt_obj->do_lu);
962         arg->object = dt_obj;
963         arg->reply = reply;
964         arg->index = index;
965         arg->u.insert.rec = (struct dt_rec *)fid;
966         arg->u.insert.key = (struct dt_key *)name;
967
968         return 0;
969 }
970
971 static int out_index_insert(struct tgt_session_info *tsi)
972 {
973         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
974         struct object_update    *update = tti->tti_u.update.tti_update;
975         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
976         struct lu_fid     *fid;
977         char              *name;
978         int                rc = 0;
979         int                size;
980
981         ENTRY;
982
983         name = object_update_param_get(update, 0, NULL);
984         if (name == NULL) {
985                 CERROR("%s: empty name for index insert: rc = %d\n",
986                        tgt_name(tsi->tsi_tgt), -EPROTO);
987                 RETURN(err_serious(-EPROTO));
988         }
989
990         fid = object_update_param_get(update, 1, &size);
991         if (fid == NULL || size != sizeof(*fid)) {
992                 CERROR("%s: invalid fid: rc = %d\n",
993                        tgt_name(tsi->tsi_tgt), -EPROTO);
994                        RETURN(err_serious(-EPROTO));
995         }
996
997         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
998                 lustre_swab_lu_fid(fid);
999
1000         if (!fid_is_sane(fid)) {
1001                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1002                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1003                 RETURN(err_serious(-EPROTO));
1004         }
1005
1006         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
1007                                  &tti->tti_tea,
1008                                  tti->tti_u.update.tti_update_reply,
1009                                  tti->tti_u.update.tti_update_reply_index);
1010         RETURN(rc);
1011 }
1012
1013 static int out_tx_index_delete_exec(const struct lu_env *env,
1014                                     struct thandle *th,
1015                                     struct tx_arg *arg)
1016 {
1017         int rc;
1018
1019         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1020
1021         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1022                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1023
1024         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1025
1026         return rc;
1027 }
1028
1029 static int out_tx_index_delete_undo(const struct lu_env *env,
1030                                     struct thandle *th,
1031                                     struct tx_arg *arg)
1032 {
1033         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1034                dt_obd_name(th->th_dev), -ENOTSUPP);
1035         return -ENOTSUPP;
1036 }
1037
1038 static int __out_tx_index_delete(const struct lu_env *env,
1039                                  struct dt_object *dt_obj, char *name,
1040                                  struct thandle_exec_args *ta,
1041                                  struct object_update_reply *reply,
1042                                  int index, char *file, int line)
1043 {
1044         struct tx_arg *arg;
1045
1046         if (dt_try_as_dir(env, dt_obj) == 0) {
1047                 ta->ta_err = -ENOTDIR;
1048                 return ta->ta_err;
1049         }
1050
1051         LASSERT(ta->ta_handle != NULL);
1052         ta->ta_err = dt_declare_delete(env, dt_obj,
1053                                        (struct dt_key *)name,
1054                                        ta->ta_handle);
1055         if (ta->ta_err != 0)
1056                 return ta->ta_err;
1057
1058         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1059                           out_tx_index_delete_undo, file,
1060                           line);
1061         LASSERT(arg);
1062         lu_object_get(&dt_obj->do_lu);
1063         arg->object = dt_obj;
1064         arg->reply = reply;
1065         arg->index = index;
1066         arg->u.insert.key = (struct dt_key *)name;
1067         return 0;
1068 }
1069
1070 static int out_index_delete(struct tgt_session_info *tsi)
1071 {
1072         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1073         struct object_update    *update = tti->tti_u.update.tti_update;
1074         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1075         char                    *name;
1076         int                      rc = 0;
1077
1078         if (!lu_object_exists(&obj->do_lu))
1079                 RETURN(-ENOENT);
1080
1081         name = object_update_param_get(update, 0, NULL);
1082         if (name == NULL) {
1083                 CERROR("%s: empty name for index delete: rc = %d\n",
1084                        tgt_name(tsi->tsi_tgt), -EPROTO);
1085                 RETURN(err_serious(-EPROTO));
1086         }
1087
1088         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1089                                  tti->tti_u.update.tti_update_reply,
1090                                  tti->tti_u.update.tti_update_reply_index);
1091         RETURN(rc);
1092 }
1093
1094 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1095                                struct tx_arg *arg)
1096 {
1097         struct dt_object *dt_obj = arg->object;
1098         int rc;
1099
1100         rc = out_obj_destroy(env, dt_obj, th);
1101
1102         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1103                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1104
1105         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1106
1107         RETURN(rc);
1108 }
1109
1110 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1111                                struct tx_arg *arg)
1112 {
1113         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1114                dt_obd_name(th->th_dev), -ENOTSUPP);
1115         return -ENOTSUPP;
1116 }
1117
1118 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1119                              struct thandle_exec_args *ta,
1120                              struct object_update_reply *reply,
1121                              int index, char *file, int line)
1122 {
1123         struct tx_arg *arg;
1124
1125         LASSERT(ta->ta_handle != NULL);
1126         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1127         if (ta->ta_err)
1128                 return ta->ta_err;
1129
1130         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1131                           file, line);
1132         LASSERT(arg);
1133         lu_object_get(&dt_obj->do_lu);
1134         arg->object = dt_obj;
1135         arg->reply = reply;
1136         arg->index = index;
1137         return 0;
1138 }
1139
1140 static int out_destroy(struct tgt_session_info *tsi)
1141 {
1142         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1143         struct object_update    *update = tti->tti_u.update.tti_update;
1144         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1145         struct lu_fid           *fid;
1146         int                      rc;
1147         ENTRY;
1148
1149         fid = &update->ou_fid;
1150         if (!fid_is_sane(fid)) {
1151                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1152                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1153                 RETURN(err_serious(-EPROTO));
1154         }
1155
1156         if (!lu_object_exists(&obj->do_lu))
1157                 RETURN(-ENOENT);
1158
1159         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1160                             tti->tti_u.update.tti_update_reply,
1161                             tti->tti_u.update.tti_update_reply_index);
1162
1163         RETURN(rc);
1164 }
1165
1166 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1167 [opc - OUT_CREATE] = {                                  \
1168         .th_name    = name,                             \
1169         .th_fail_id = 0,                                \
1170         .th_opc     = opc,                              \
1171         .th_flags   = flags,                            \
1172         .th_act     = fn,                               \
1173         .th_fmt     = NULL,                             \
1174         .th_version = 0,                                \
1175 }
1176
1177 #define out_handler mdt_handler
1178 static struct tgt_handler out_update_ops[] = {
1179         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1180                      out_create),
1181         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1182                      out_destroy),
1183         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1184                      out_ref_add),
1185         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1186                      out_ref_del),
1187         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1188                      out_attr_set),
1189         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1190                      out_attr_get),
1191         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1192                      out_xattr_set),
1193         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1194                      out_xattr_get),
1195         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1196                      out_index_lookup),
1197         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1198                      MUTABOR | HABEO_REFERO, out_index_insert),
1199         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1200                      MUTABOR | HABEO_REFERO, out_index_delete),
1201 };
1202
1203 struct tgt_handler *out_handler_find(__u32 opc)
1204 {
1205         struct tgt_handler *h;
1206
1207         h = NULL;
1208         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1209                 h = &out_update_ops[opc - OUT_CREATE];
1210                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1211                          h->th_opc, opc);
1212         } else {
1213                 h = NULL; /* unsupported opc */
1214         }
1215         return h;
1216 }
1217
1218 /**
1219  * Object updates between Targets. Because all the updates has been
1220  * dis-assemblied into object updates at sender side, so OUT will
1221  * call OSD API directly to execute these updates.
1222  *
1223  * In DNE phase I all of the updates in the request need to be executed
1224  * in one transaction, and the transaction has to be synchronously.
1225  *
1226  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1227  * format.
1228  */
1229 int out_handle(struct tgt_session_info *tsi)
1230 {
1231         const struct lu_env             *env = tsi->tsi_env;
1232         struct tgt_thread_info          *tti = tgt_th_info(env);
1233         struct thandle_exec_args        *ta = &tti->tti_tea;
1234         struct req_capsule              *pill = tsi->tsi_pill;
1235         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1236         struct object_update_request    *ureq;
1237         struct object_update            *update;
1238         struct object_update_reply      *reply;
1239         int                              bufsize;
1240         int                              count;
1241         int                              old_batchid = -1;
1242         int                              i;
1243         int                              rc = 0;
1244         int                              rc1 = 0;
1245
1246         ENTRY;
1247
1248         req_capsule_set(pill, &RQF_OUT_UPDATE);
1249         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1250         if (ureq == NULL) {
1251                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1252                        -EPROTO);
1253                 RETURN(err_serious(-EPROTO));
1254         }
1255
1256         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1257         if (bufsize != object_update_request_size(ureq)) {
1258                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1259                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1260                 RETURN(err_serious(-EPROTO));
1261         }
1262
1263         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1264                 CERROR("%s: invalid update buffer magic %x expect %x: "
1265                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1266                        UPDATE_REQUEST_MAGIC, -EPROTO);
1267                 RETURN(err_serious(-EPROTO));
1268         }
1269
1270         count = ureq->ourq_count;
1271         if (count <= 0) {
1272                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1273                        -EPROTO);
1274                 RETURN(err_serious(-EPROTO));
1275         }
1276
1277         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1278                              OUT_UPDATE_REPLY_SIZE);
1279         rc = req_capsule_server_pack(pill);
1280         if (rc != 0) {
1281                 CERROR("%s: Can't pack response: rc = %d\n",
1282                        tgt_name(tsi->tsi_tgt), rc);
1283                 RETURN(rc);
1284         }
1285
1286         /* Prepare the update reply buffer */
1287         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1288         if (reply == NULL)
1289                 RETURN(err_serious(-EPROTO));
1290         object_update_reply_init(reply, count);
1291         tti->tti_u.update.tti_update_reply = reply;
1292
1293         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1294         if (rc != 0)
1295                 RETURN(rc);
1296
1297         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1298
1299         /* Walk through updates in the request to execute them synchronously */
1300         for (i = 0; i < count; i++) {
1301                 struct tgt_handler      *h;
1302                 struct dt_object        *dt_obj;
1303
1304                 update = object_update_request_get(ureq, i, NULL);
1305                 if (update == NULL)
1306                         GOTO(out, rc = -EPROTO);
1307
1308                 if (ptlrpc_req_need_swab(pill->rc_req))
1309                         lustre_swab_object_update(update);
1310
1311                 if (old_batchid == -1) {
1312                         old_batchid = update->ou_batchid;
1313                 } else if (old_batchid != update->ou_batchid) {
1314                         /* Stop the current update transaction,
1315                          * create a new one */
1316                         rc = out_tx_end(env, ta);
1317                         if (rc != 0)
1318                                 RETURN(rc);
1319
1320                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1321                         if (rc != 0)
1322                                 RETURN(rc);
1323                         old_batchid = update->ou_batchid;
1324                 }
1325
1326                 if (!fid_is_sane(&update->ou_fid)) {
1327                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1328                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1329                                -EPROTO);
1330                         GOTO(out, rc = err_serious(-EPROTO));
1331                 }
1332
1333                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1334                 if (IS_ERR(dt_obj))
1335                         GOTO(out, rc = PTR_ERR(dt_obj));
1336
1337                 if (dt->dd_record_fid_accessed) {
1338                         lfsck_pack_rfa(&tti->tti_lr,
1339                                        lu_object_fid(&dt_obj->do_lu));
1340                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1341                 }
1342
1343                 tti->tti_u.update.tti_dt_object = dt_obj;
1344                 tti->tti_u.update.tti_update = update;
1345                 tti->tti_u.update.tti_update_reply_index = i;
1346
1347                 h = out_handler_find(update->ou_type);
1348                 if (likely(h != NULL)) {
1349                         /* For real modification RPC, check if the update
1350                          * has been executed */
1351                         if (h->th_flags & MUTABOR) {
1352                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1353
1354                                 if (out_check_resent(env, dt, dt_obj, req,
1355                                                      out_reconstruct, reply, i))
1356                                         GOTO(next, rc);
1357                         }
1358
1359                         rc = h->th_act(tsi);
1360                 } else {
1361                         CERROR("%s: The unsupported opc: 0x%x\n",
1362                                tgt_name(tsi->tsi_tgt), update->ou_type);
1363                         lu_object_put(env, &dt_obj->do_lu);
1364                         GOTO(out, rc = -ENOTSUPP);
1365                 }
1366 next:
1367                 lu_object_put(env, &dt_obj->do_lu);
1368                 if (rc < 0)
1369                         GOTO(out, rc);
1370         }
1371 out:
1372         rc1 = out_tx_end(env, ta);
1373         if (rc == 0)
1374                 rc = rc1;
1375         RETURN(rc);
1376 }
1377
1378 struct tgt_handler tgt_out_handlers[] = {
1379 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1380 };
1381 EXPORT_SYMBOL(tgt_out_handlers);
1382