Whamcloud - gitweb
5c650219dfa9a3b6328b8323e2abad6c9fe059d9
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         i = ta->ta_argno;
48         LASSERT(i < UPDATE_MAX_OPS);
49
50         ta->ta_argno++;
51
52         ta->ta_args[i].exec_fn = func;
53         ta->ta_args[i].undo_fn = undo;
54         ta->ta_args[i].file    = file;
55         ta->ta_args[i].line    = line;
56
57         return &ta->ta_args[i];
58 }
59
60 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
61                         struct thandle_exec_args *ta, struct obd_export *exp)
62 {
63         memset(ta, 0, sizeof(*ta));
64         ta->ta_handle = dt_trans_create(env, dt);
65         if (IS_ERR(ta->ta_handle)) {
66                 CERROR("%s: start handle error: rc = %ld\n",
67                        dt_obd_name(dt), PTR_ERR(ta->ta_handle));
68                 return PTR_ERR(ta->ta_handle);
69         }
70         ta->ta_dev = dt;
71         if (exp->exp_need_sync)
72                 ta->ta_handle->th_sync = 1;
73
74         return 0;
75 }
76
77 static int out_trans_start(const struct lu_env *env,
78                            struct thandle_exec_args *ta)
79 {
80         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
81 }
82
83 static int out_trans_stop(const struct lu_env *env,
84                           struct thandle_exec_args *ta, int err)
85 {
86         int i;
87         int rc;
88
89         ta->ta_handle->th_result = err;
90         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
91         for (i = 0; i < ta->ta_argno; i++) {
92                 if (ta->ta_args[i].object != NULL) {
93                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
94                         ta->ta_args[i].object = NULL;
95                 }
96         }
97
98         return rc;
99 }
100
101 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
102 {
103         struct tgt_session_info *tsi = tgt_ses_info(env);
104         int i = 0, rc;
105
106         LASSERT(ta->ta_dev);
107         LASSERT(ta->ta_handle);
108
109         if (ta->ta_err != 0 || ta->ta_argno == 0)
110                 GOTO(stop, rc = ta->ta_err);
111
112         rc = out_trans_start(env, ta);
113         if (unlikely(rc))
114                 GOTO(stop, rc);
115
116         for (i = 0; i < ta->ta_argno; i++) {
117                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
118                                             &ta->ta_args[i]);
119                 if (unlikely(rc)) {
120                         CDEBUG(D_INFO, "error during execution of #%u from"
121                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
122                                ta->ta_args[i].line, rc);
123                         while (--i >= 0) {
124                                 LASSERTF(ta->ta_args[i].undo_fn != NULL,
125                                     "can't undo changes, hope for failover!\n");
126                                 ta->ta_args[i].undo_fn(env, ta->ta_handle,
127                                                        &ta->ta_args[i]);
128                         }
129                         break;
130                 }
131         }
132
133         /* Only fail for real update */
134         tsi->tsi_reply_fail_id = OBD_FAIL_UPDATE_OBJ_NET_REP;
135 stop:
136         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
137                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
138         out_trans_stop(env, ta, rc);
139         ta->ta_handle = NULL;
140         ta->ta_argno = 0;
141         ta->ta_err = 0;
142
143         RETURN(rc);
144 }
145
146 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
147                             struct dt_object *obj, struct update_reply *reply,
148                             int index)
149 {
150         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
151                dt_obd_name(dt), reply, index, 0);
152
153         update_insert_reply(reply, NULL, 0, index, 0);
154         return;
155 }
156
157 typedef void (*out_reconstruct_t)(const struct lu_env *env,
158                                   struct dt_device *dt,
159                                   struct dt_object *obj,
160                                   struct update_reply *reply,
161                                   int index);
162
163 static inline int out_check_resent(const struct lu_env *env,
164                                    struct dt_device *dt,
165                                    struct dt_object *obj,
166                                    struct ptlrpc_request *req,
167                                    out_reconstruct_t reconstruct,
168                                    struct update_reply *reply,
169                                    int index)
170 {
171         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
172                 return 0;
173
174         if (req_xid_is_last(req)) {
175                 reconstruct(env, dt, obj, reply, index);
176                 return 1;
177         }
178         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
179                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
180         return 0;
181 }
182
183 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
184                            struct thandle *th)
185 {
186         int rc;
187
188         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
189                PFID(lu_object_fid(&dt_obj->do_lu)));
190
191         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
192         rc = dt_destroy(env, dt_obj, th);
193         dt_write_unlock(env, dt_obj);
194
195         return rc;
196 }
197
198 /**
199  * All of the xxx_undo will be used once execution failed,
200  * But because all of the required resource has been reserved in
201  * declare phase, i.e. if declare succeed, it should make sure
202  * the following executing phase succeed in anyway, so these undo
203  * should be useless for most of the time in Phase I
204  */
205 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
206                        struct tx_arg *arg)
207 {
208         int rc;
209
210         rc = out_obj_destroy(env, arg->object, th);
211         if (rc != 0)
212                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
213                        dt_obd_name(th->th_dev), rc);
214         return rc;
215 }
216
217 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
218                        struct tx_arg *arg)
219 {
220         struct dt_object        *dt_obj = arg->object;
221         int                      rc;
222
223         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
224                dt_obd_name(th->th_dev),
225                PFID(lu_object_fid(&arg->object->do_lu)),
226                arg->u.create.dof.dof_type,
227                arg->u.create.attr.la_mode & S_IFMT);
228
229         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
230         rc = dt_create(env, dt_obj, &arg->u.create.attr,
231                        &arg->u.create.hint, &arg->u.create.dof, th);
232
233         dt_write_unlock(env, dt_obj);
234
235         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
236                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
237
238         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
239
240         return rc;
241 }
242
243 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
244                            struct lu_attr *attr, struct lu_fid *parent_fid,
245                            struct dt_object_format *dof,
246                            struct thandle_exec_args *ta,
247                            struct update_reply *reply,
248                            int index, char *file, int line)
249 {
250         struct tx_arg *arg;
251
252         LASSERT(ta->ta_handle != NULL);
253         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
254                                        ta->ta_handle);
255         if (ta->ta_err != 0)
256                 return ta->ta_err;
257
258         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
259                           line);
260         LASSERT(arg);
261
262         /* release the object in out_trans_stop */
263         lu_object_get(&obj->do_lu);
264         arg->object = obj;
265         arg->u.create.attr = *attr;
266         if (parent_fid)
267                 arg->u.create.fid = *parent_fid;
268         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
269         arg->u.create.dof  = *dof;
270         arg->reply = reply;
271         arg->index = index;
272
273         return 0;
274 }
275
276 static int out_create(struct tgt_session_info *tsi)
277 {
278         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
279         struct update           *update = tti->tti_u.update.tti_update;
280         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
281         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
282         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
283         struct lu_attr          *attr = &tti->tti_attr;
284         struct lu_fid           *fid = NULL;
285         struct obdo             *wobdo;
286         int                     size;
287         int                     rc;
288
289         ENTRY;
290
291         wobdo = update_param_buf(update, 0, &size);
292         if (wobdo == NULL || size != sizeof(*wobdo)) {
293                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
294                        tgt_name(tsi->tsi_tgt), -EPROTO);
295                 RETURN(err_serious(-EPROTO));
296         }
297
298         obdo_le_to_cpu(wobdo, wobdo);
299         lustre_get_wire_obdo(NULL, lobdo, wobdo);
300         la_from_obdo(attr, lobdo, lobdo->o_valid);
301
302         dof->dof_type = dt_mode_to_dft(attr->la_mode);
303         if (S_ISDIR(attr->la_mode)) {
304                 int size;
305
306                 fid = update_param_buf(update, 1, &size);
307                 if (fid == NULL || size != sizeof(*fid)) {
308                         CERROR("%s: invalid fid: rc = %d\n",
309                                tgt_name(tsi->tsi_tgt), -EPROTO);
310                         RETURN(err_serious(-EPROTO));
311                 }
312                 fid_le_to_cpu(fid, fid);
313                 if (!fid_is_sane(fid)) {
314                         CERROR("%s: invalid fid "DFID": rc = %d\n",
315                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
316                         RETURN(err_serious(-EPROTO));
317                 }
318         }
319
320         if (lu_object_exists(&obj->do_lu))
321                 RETURN(-EEXIST);
322
323         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
324                            &tti->tti_tea,
325                            tti->tti_u.update.tti_update_reply,
326                            tti->tti_u.update.tti_update_reply_index);
327
328         RETURN(rc);
329 }
330
331 static int out_tx_attr_set_undo(const struct lu_env *env,
332                                 struct thandle *th, struct tx_arg *arg)
333 {
334         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
335                dt_obd_name(th->th_dev),
336                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
337
338         return -ENOTSUPP;
339 }
340
341 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
342                                 struct tx_arg *arg)
343 {
344         struct dt_object        *dt_obj = arg->object;
345         int                     rc;
346
347         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
348                PFID(lu_object_fid(&dt_obj->do_lu)));
349
350         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
351         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
352         dt_write_unlock(env, dt_obj);
353
354         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
355                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
356
357         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
358
359         return rc;
360 }
361
362 static int __out_tx_attr_set(const struct lu_env *env,
363                              struct dt_object *dt_obj,
364                              const struct lu_attr *attr,
365                              struct thandle_exec_args *th,
366                              struct update_reply *reply, int index,
367                              char *file, int line)
368 {
369         struct tx_arg           *arg;
370
371         LASSERT(th->ta_handle != NULL);
372         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
373         if (th->ta_err != 0)
374                 return th->ta_err;
375
376         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
377                           file, line);
378         LASSERT(arg);
379         lu_object_get(&dt_obj->do_lu);
380         arg->object = dt_obj;
381         arg->u.attr_set.attr = *attr;
382         arg->reply = reply;
383         arg->index = index;
384         return 0;
385 }
386
387 static int out_attr_set(struct tgt_session_info *tsi)
388 {
389         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
390         struct update           *update = tti->tti_u.update.tti_update;
391         struct lu_attr          *attr = &tti->tti_attr;
392         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
393         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
394         struct obdo             *wobdo;
395         int                      size;
396         int                      rc;
397
398         ENTRY;
399
400         wobdo = update_param_buf(update, 0, &size);
401         if (wobdo == NULL || size != sizeof(*wobdo)) {
402                 CERROR("%s: empty obdo in the update: rc = %d\n",
403                        tgt_name(tsi->tsi_tgt), -EPROTO);
404                 RETURN(err_serious(-EPROTO));
405         }
406
407         attr->la_valid = 0;
408         attr->la_valid = 0;
409         obdo_le_to_cpu(wobdo, wobdo);
410         lustre_get_wire_obdo(NULL, lobdo, wobdo);
411         la_from_obdo(attr, lobdo, lobdo->o_valid);
412
413         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
414                              tti->tti_u.update.tti_update_reply,
415                              tti->tti_u.update.tti_update_reply_index);
416
417         RETURN(rc);
418 }
419
420 static int out_attr_get(struct tgt_session_info *tsi)
421 {
422         const struct lu_env     *env = tsi->tsi_env;
423         struct tgt_thread_info  *tti = tgt_th_info(env);
424         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
425         struct lu_attr          *la = &tti->tti_attr;
426         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
427         int                     rc;
428
429         ENTRY;
430
431         if (!lu_object_exists(&obj->do_lu))
432                 RETURN(-ENOENT);
433
434         dt_read_lock(env, obj, MOR_TGT_CHILD);
435         rc = dt_attr_get(env, obj, la, NULL);
436         if (rc)
437                 GOTO(out_unlock, rc);
438         /*
439          * If it is a directory, we will also check whether the
440          * directory is empty.
441          * la_flags = 0 : Empty.
442          *          = 1 : Not empty.
443          */
444         la->la_flags = 0;
445         if (S_ISDIR(la->la_mode)) {
446                 struct dt_it            *it;
447                 const struct dt_it_ops  *iops;
448
449                 if (!dt_try_as_dir(env, obj))
450                         GOTO(out_unlock, rc = -ENOTDIR);
451
452                 iops = &obj->do_index_ops->dio_it;
453                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
454                 if (!IS_ERR(it)) {
455                         int  result;
456                         result = iops->get(env, it, (const void *)"");
457                         if (result > 0) {
458                                 int i;
459                                 for (result = 0, i = 0; result == 0 && i < 3;
460                                      ++i)
461                                         result = iops->next(env, it);
462                                 if (result == 0)
463                                         la->la_flags = 1;
464                         } else if (result == 0)
465                                 /*
466                                  * Huh? Index contains no zero key?
467                                  */
468                                 rc = -EIO;
469
470                         iops->put(env, it);
471                         iops->fini(env, it);
472                 }
473         }
474
475         obdo->o_valid = 0;
476         obdo_from_la(obdo, la, la->la_valid);
477         obdo_cpu_to_le(obdo, obdo);
478         lustre_set_wire_obdo(NULL, obdo, obdo);
479
480 out_unlock:
481         dt_read_unlock(env, obj);
482
483         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
484                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
485                0, rc);
486
487         update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
488                             sizeof(*obdo),
489                             tti->tti_u.update.tti_update_reply_index, rc);
490         RETURN(rc);
491 }
492
493 static int out_xattr_get(struct tgt_session_info *tsi)
494 {
495         const struct lu_env     *env = tsi->tsi_env;
496         struct tgt_thread_info  *tti = tgt_th_info(env);
497         struct update           *update = tti->tti_u.update.tti_update;
498         struct lu_buf           *lbuf = &tti->tti_buf;
499         struct update_reply     *reply = tti->tti_u.update.tti_update_reply;
500         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
501         char                    *name;
502         void                    *ptr;
503         int                      idx = tti->tti_u.update.tti_update_reply_index;
504         int                      rc;
505
506         ENTRY;
507
508         name = (char *)update_param_buf(update, 0, NULL);
509         if (name == NULL) {
510                 CERROR("%s: empty name for xattr get: rc = %d\n",
511                        tgt_name(tsi->tsi_tgt), -EPROTO);
512                 RETURN(err_serious(-EPROTO));
513         }
514
515         ptr = update_get_buf_internal(reply, idx, NULL);
516         LASSERT(ptr != NULL);
517
518         /* The first 4 bytes(int) are used to store the result */
519         lbuf->lb_buf = (char *)ptr + sizeof(int);
520         lbuf->lb_len = UPDATE_BUFFER_SIZE - sizeof(struct update_reply);
521         dt_read_lock(env, obj, MOR_TGT_CHILD);
522         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
523         dt_read_unlock(env, obj);
524         if (rc < 0) {
525                 lbuf->lb_len = 0;
526                 GOTO(out, rc);
527         }
528         if (rc == 0) {
529                 lbuf->lb_len = 0;
530                 GOTO(out, rc = -ENOENT);
531         }
532         lbuf->lb_len = rc;
533         rc = 0;
534         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
535                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
536                name, (int)lbuf->lb_len);
537
538         GOTO(out, rc);
539
540 out:
541         *(int *)ptr = rc;
542         reply->ur_lens[idx] = lbuf->lb_len + sizeof(int);
543
544         return rc;
545 }
546
547 static int out_index_lookup(struct tgt_session_info *tsi)
548 {
549         const struct lu_env     *env = tsi->tsi_env;
550         struct tgt_thread_info  *tti = tgt_th_info(env);
551         struct update           *update = tti->tti_u.update.tti_update;
552         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
553         char                    *name;
554         int                      rc;
555
556         ENTRY;
557
558         if (!lu_object_exists(&obj->do_lu))
559                 RETURN(-ENOENT);
560
561         name = (char *)update_param_buf(update, 0, NULL);
562         if (name == NULL) {
563                 CERROR("%s: empty name for lookup: rc = %d\n",
564                        tgt_name(tsi->tsi_tgt), -EPROTO);
565                 RETURN(err_serious(-EPROTO));
566         }
567
568         dt_read_lock(env, obj, MOR_TGT_CHILD);
569         if (!dt_try_as_dir(env, obj))
570                 GOTO(out_unlock, rc = -ENOTDIR);
571
572         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
573                 (struct dt_key *)name, NULL);
574
575         if (rc < 0)
576                 GOTO(out_unlock, rc);
577
578         if (rc == 0)
579                 rc += 1;
580
581         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
582                PFID(lu_object_fid(&obj->do_lu)), name,
583                PFID(&tti->tti_fid1), rc);
584         fid_cpu_to_le(&tti->tti_fid1, &tti->tti_fid1);
585
586 out_unlock:
587         dt_read_unlock(env, obj);
588
589         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
590                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
591                0, rc);
592
593         update_insert_reply(tti->tti_u.update.tti_update_reply,
594                             &tti->tti_fid1, sizeof(tti->tti_fid1),
595                             tti->tti_u.update.tti_update_reply_index, rc);
596         RETURN(rc);
597 }
598
599 static int out_tx_xattr_set_exec(const struct lu_env *env,
600                                  struct thandle *th,
601                                  struct tx_arg *arg)
602 {
603         struct dt_object *dt_obj = arg->object;
604         int rc;
605
606         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
607                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
608                arg->u.xattr_set.name, arg->u.xattr_set.flags);
609
610         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
611         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
612                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
613                           th, NULL);
614         dt_write_unlock(env, dt_obj);
615         /**
616          * Ignore errors if this is LINK EA
617          **/
618         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
619                 rc = 0;
620
621         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
622                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
623
624         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
625
626         return rc;
627 }
628
629 static int __out_tx_xattr_set(const struct lu_env *env,
630                               struct dt_object *dt_obj,
631                               const struct lu_buf *buf,
632                               const char *name, int flags,
633                               struct thandle_exec_args *ta,
634                               struct update_reply *reply, int index,
635                               char *file, int line)
636 {
637         struct tx_arg           *arg;
638
639         LASSERT(ta->ta_handle != NULL);
640         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
641                                           flags, ta->ta_handle);
642         if (ta->ta_err != 0)
643                 return ta->ta_err;
644
645         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
646         LASSERT(arg);
647         lu_object_get(&dt_obj->do_lu);
648         arg->object = dt_obj;
649         arg->u.xattr_set.name = name;
650         arg->u.xattr_set.flags = flags;
651         arg->u.xattr_set.buf = *buf;
652         arg->reply = reply;
653         arg->index = index;
654         arg->u.xattr_set.csum = 0;
655         return 0;
656 }
657
658 static int out_xattr_set(struct tgt_session_info *tsi)
659 {
660         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
661         struct update           *update = tti->tti_u.update.tti_update;
662         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
663         struct lu_buf           *lbuf = &tti->tti_buf;
664         char                    *name;
665         char                    *buf;
666         char                    *tmp;
667         int                      buf_len = 0;
668         int                      flag;
669         int                      rc;
670         ENTRY;
671
672         name = update_param_buf(update, 0, NULL);
673         if (name == NULL) {
674                 CERROR("%s: empty name for xattr set: rc = %d\n",
675                        tgt_name(tsi->tsi_tgt), -EPROTO);
676                 RETURN(err_serious(-EPROTO));
677         }
678
679         buf = (char *)update_param_buf(update, 1, &buf_len);
680         if (buf == NULL || buf_len == 0) {
681                 CERROR("%s: empty buf for xattr set: rc = %d\n",
682                        tgt_name(tsi->tsi_tgt), -EPROTO);
683                 RETURN(err_serious(-EPROTO));
684         }
685
686         lbuf->lb_buf = buf;
687         lbuf->lb_len = buf_len;
688
689         tmp = (char *)update_param_buf(update, 2, NULL);
690         if (tmp == NULL) {
691                 CERROR("%s: empty flag for xattr set: rc = %d\n",
692                        tgt_name(tsi->tsi_tgt), -EPROTO);
693                 RETURN(err_serious(-EPROTO));
694         }
695
696         flag = le32_to_cpu(*(int *)tmp);
697
698         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
699                               &tti->tti_tea,
700                               tti->tti_u.update.tti_update_reply,
701                               tti->tti_u.update.tti_update_reply_index);
702         RETURN(rc);
703 }
704
705 static int out_obj_ref_add(const struct lu_env *env,
706                            struct dt_object *dt_obj,
707                            struct thandle *th)
708 {
709         int rc;
710
711         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
712         rc = dt_ref_add(env, dt_obj, th);
713         dt_write_unlock(env, dt_obj);
714
715         return rc;
716 }
717
718 static int out_obj_ref_del(const struct lu_env *env,
719                            struct dt_object *dt_obj,
720                            struct thandle *th)
721 {
722         int rc;
723
724         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
725         rc = dt_ref_del(env, dt_obj, th);
726         dt_write_unlock(env, dt_obj);
727
728         return rc;
729 }
730
731 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
732                                struct tx_arg *arg)
733 {
734         struct dt_object *dt_obj = arg->object;
735         int rc;
736
737         rc = out_obj_ref_add(env, dt_obj, th);
738
739         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
740                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
741
742         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
743         return rc;
744 }
745
746 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
747                                struct tx_arg *arg)
748 {
749         return out_obj_ref_del(env, arg->object, th);
750 }
751
752 static int __out_tx_ref_add(const struct lu_env *env,
753                             struct dt_object *dt_obj,
754                             struct thandle_exec_args *ta,
755                             struct update_reply *reply,
756                             int index, char *file, int line)
757 {
758         struct tx_arg   *arg;
759
760         LASSERT(ta->ta_handle != NULL);
761         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
762         if (ta->ta_err != 0)
763                 return ta->ta_err;
764
765         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
766                           line);
767         LASSERT(arg);
768         lu_object_get(&dt_obj->do_lu);
769         arg->object = dt_obj;
770         arg->reply = reply;
771         arg->index = index;
772         return 0;
773 }
774
775 /**
776  * increase ref of the object
777  **/
778 static int out_ref_add(struct tgt_session_info *tsi)
779 {
780         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
781         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
782         int                      rc;
783
784         ENTRY;
785
786         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
787                             tti->tti_u.update.tti_update_reply,
788                             tti->tti_u.update.tti_update_reply_index);
789         RETURN(rc);
790 }
791
792 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
793                                struct tx_arg *arg)
794 {
795         struct dt_object        *dt_obj = arg->object;
796         int                      rc;
797
798         rc = out_obj_ref_del(env, dt_obj, th);
799
800         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
801                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
802
803         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
804
805         return rc;
806 }
807
808 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
809                                struct tx_arg *arg)
810 {
811         return out_obj_ref_add(env, arg->object, th);
812 }
813
814 static int __out_tx_ref_del(const struct lu_env *env,
815                             struct dt_object *dt_obj,
816                             struct thandle_exec_args *ta,
817                             struct update_reply *reply,
818                             int index, char *file, int line)
819 {
820         struct tx_arg   *arg;
821
822         LASSERT(ta->ta_handle != NULL);
823         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
824         if (ta->ta_err != 0)
825                 return ta->ta_err;
826
827         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
828                           line);
829         LASSERT(arg);
830         lu_object_get(&dt_obj->do_lu);
831         arg->object = dt_obj;
832         arg->reply = reply;
833         arg->index = index;
834         return 0;
835 }
836
837 static int out_ref_del(struct tgt_session_info *tsi)
838 {
839         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
840         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
841         int                      rc;
842
843         ENTRY;
844
845         if (!lu_object_exists(&obj->do_lu))
846                 RETURN(-ENOENT);
847
848         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
849                             tti->tti_u.update.tti_update_reply,
850                             tti->tti_u.update.tti_update_reply_index);
851         RETURN(rc);
852 }
853
854 static int out_obj_index_insert(const struct lu_env *env,
855                                 struct dt_object *dt_obj,
856                                 const struct dt_rec *rec,
857                                 const struct dt_key *key,
858                                 struct thandle *th)
859 {
860         int rc;
861
862         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
863                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
864                (char *)key, PFID((struct lu_fid *)rec));
865
866         if (dt_try_as_dir(env, dt_obj) == 0)
867                 return -ENOTDIR;
868
869         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
870         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
871         dt_write_unlock(env, dt_obj);
872
873         return rc;
874 }
875
876 static int out_obj_index_delete(const struct lu_env *env,
877                                 struct dt_object *dt_obj,
878                                 const struct dt_key *key,
879                                 struct thandle *th)
880 {
881         int rc;
882
883         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
884                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
885                (char *)key);
886
887         if (dt_try_as_dir(env, dt_obj) == 0)
888                 return -ENOTDIR;
889
890         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
891         rc = dt_delete(env, dt_obj, key, th, NULL);
892         dt_write_unlock(env, dt_obj);
893
894         return rc;
895 }
896
897 static int out_tx_index_insert_exec(const struct lu_env *env,
898                                     struct thandle *th, struct tx_arg *arg)
899 {
900         struct dt_object *dt_obj = arg->object;
901         int rc;
902
903         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
904                                   arg->u.insert.key, th);
905
906         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
907                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
908
909         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
910
911         return rc;
912 }
913
914 static int out_tx_index_insert_undo(const struct lu_env *env,
915                                     struct thandle *th, struct tx_arg *arg)
916 {
917         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
918 }
919
920 static int __out_tx_index_insert(const struct lu_env *env,
921                                  struct dt_object *dt_obj,
922                                  char *name, struct lu_fid *fid,
923                                  struct thandle_exec_args *ta,
924                                  struct update_reply *reply,
925                                  int index, char *file, int line)
926 {
927         struct tx_arg *arg;
928
929         LASSERT(ta->ta_handle != NULL);
930
931         if (lu_object_exists(&dt_obj->do_lu)) {
932                 if (dt_try_as_dir(env, dt_obj) == 0) {
933                         ta->ta_err = -ENOTDIR;
934                         return ta->ta_err;
935                 }
936                 ta->ta_err = dt_declare_insert(env, dt_obj,
937                                                (struct dt_rec *)fid,
938                                                (struct dt_key *)name,
939                                                ta->ta_handle);
940         }
941
942         if (ta->ta_err != 0)
943                 return ta->ta_err;
944
945         arg = tx_add_exec(ta, out_tx_index_insert_exec,
946                           out_tx_index_insert_undo, file,
947                           line);
948         LASSERT(arg);
949         lu_object_get(&dt_obj->do_lu);
950         arg->object = dt_obj;
951         arg->reply = reply;
952         arg->index = index;
953         arg->u.insert.rec = (struct dt_rec *)fid;
954         arg->u.insert.key = (struct dt_key *)name;
955
956         return 0;
957 }
958
959 static int out_index_insert(struct tgt_session_info *tsi)
960 {
961         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
962         struct update     *update = tti->tti_u.update.tti_update;
963         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
964         struct lu_fid     *fid;
965         char              *name;
966         int                rc = 0;
967         int                size;
968
969         ENTRY;
970
971         name = (char *)update_param_buf(update, 0, NULL);
972         if (name == NULL) {
973                 CERROR("%s: empty name for index insert: rc = %d\n",
974                        tgt_name(tsi->tsi_tgt), -EPROTO);
975                 RETURN(err_serious(-EPROTO));
976         }
977
978         fid = (struct lu_fid *)update_param_buf(update, 1, &size);
979         if (fid == NULL || size != sizeof(*fid)) {
980                 CERROR("%s: invalid fid: rc = %d\n",
981                        tgt_name(tsi->tsi_tgt), -EPROTO);
982                        RETURN(err_serious(-EPROTO));
983         }
984
985         fid_le_to_cpu(fid, fid);
986         if (!fid_is_sane(fid)) {
987                 CERROR("%s: invalid FID "DFID": rc = %d\n",
988                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
989                 RETURN(err_serious(-EPROTO));
990         }
991
992         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
993                                  &tti->tti_tea,
994                                  tti->tti_u.update.tti_update_reply,
995                                  tti->tti_u.update.tti_update_reply_index);
996         RETURN(rc);
997 }
998
999 static int out_tx_index_delete_exec(const struct lu_env *env,
1000                                     struct thandle *th,
1001                                     struct tx_arg *arg)
1002 {
1003         int rc;
1004
1005         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1006
1007         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1008                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1009
1010         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1011
1012         return rc;
1013 }
1014
1015 static int out_tx_index_delete_undo(const struct lu_env *env,
1016                                     struct thandle *th,
1017                                     struct tx_arg *arg)
1018 {
1019         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1020                dt_obd_name(th->th_dev), -ENOTSUPP);
1021         return -ENOTSUPP;
1022 }
1023
1024 static int __out_tx_index_delete(const struct lu_env *env,
1025                                  struct dt_object *dt_obj, char *name,
1026                                  struct thandle_exec_args *ta,
1027                                  struct update_reply *reply,
1028                                  int index, char *file, int line)
1029 {
1030         struct tx_arg *arg;
1031
1032         if (dt_try_as_dir(env, dt_obj) == 0) {
1033                 ta->ta_err = -ENOTDIR;
1034                 return ta->ta_err;
1035         }
1036
1037         LASSERT(ta->ta_handle != NULL);
1038         ta->ta_err = dt_declare_delete(env, dt_obj,
1039                                        (struct dt_key *)name,
1040                                        ta->ta_handle);
1041         if (ta->ta_err != 0)
1042                 return ta->ta_err;
1043
1044         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1045                           out_tx_index_delete_undo, file,
1046                           line);
1047         LASSERT(arg);
1048         lu_object_get(&dt_obj->do_lu);
1049         arg->object = dt_obj;
1050         arg->reply = reply;
1051         arg->index = index;
1052         arg->u.insert.key = (struct dt_key *)name;
1053         return 0;
1054 }
1055
1056 static int out_index_delete(struct tgt_session_info *tsi)
1057 {
1058         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1059         struct update           *update = tti->tti_u.update.tti_update;
1060         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1061         char                    *name;
1062         int                      rc = 0;
1063
1064         if (!lu_object_exists(&obj->do_lu))
1065                 RETURN(-ENOENT);
1066
1067         name = (char *)update_param_buf(update, 0, NULL);
1068         if (name == NULL) {
1069                 CERROR("%s: empty name for index delete: rc = %d\n",
1070                        tgt_name(tsi->tsi_tgt), -EPROTO);
1071                 RETURN(err_serious(-EPROTO));
1072         }
1073
1074         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1075                                  tti->tti_u.update.tti_update_reply,
1076                                  tti->tti_u.update.tti_update_reply_index);
1077         RETURN(rc);
1078 }
1079
1080 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1081                                struct tx_arg *arg)
1082 {
1083         struct dt_object *dt_obj = arg->object;
1084         int rc;
1085
1086         rc = out_obj_destroy(env, dt_obj, th);
1087
1088         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1089                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1090
1091         update_insert_reply(arg->reply, NULL, 0, arg->index, rc);
1092
1093         RETURN(rc);
1094 }
1095
1096 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1097                                struct tx_arg *arg)
1098 {
1099         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1100                dt_obd_name(th->th_dev), -ENOTSUPP);
1101         return -ENOTSUPP;
1102 }
1103
1104 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1105                              struct thandle_exec_args *ta,
1106                              struct update_reply *reply,
1107                              int index, char *file, int line)
1108 {
1109         struct tx_arg *arg;
1110
1111         LASSERT(ta->ta_handle != NULL);
1112         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1113         if (ta->ta_err)
1114                 return ta->ta_err;
1115
1116         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1117                           file, line);
1118         LASSERT(arg);
1119         lu_object_get(&dt_obj->do_lu);
1120         arg->object = dt_obj;
1121         arg->reply = reply;
1122         arg->index = index;
1123         return 0;
1124 }
1125
1126 static int out_destroy(struct tgt_session_info *tsi)
1127 {
1128         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1129         struct update           *update = tti->tti_u.update.tti_update;
1130         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1131         struct lu_fid           *fid;
1132         int                      rc;
1133         ENTRY;
1134
1135         fid = &update->u_fid;
1136         fid_le_to_cpu(fid, fid);
1137         if (!fid_is_sane(fid)) {
1138                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1139                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1140                 RETURN(err_serious(-EPROTO));
1141         }
1142
1143         if (!lu_object_exists(&obj->do_lu))
1144                 RETURN(-ENOENT);
1145
1146         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1147                             tti->tti_u.update.tti_update_reply,
1148                             tti->tti_u.update.tti_update_reply_index);
1149
1150         RETURN(rc);
1151 }
1152
1153 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1154 [opc - OBJ_CREATE] = {                                  \
1155         .th_name    = name,                             \
1156         .th_fail_id = 0,                                \
1157         .th_opc     = opc,                              \
1158         .th_flags   = flags,                            \
1159         .th_act     = fn,                               \
1160         .th_fmt     = NULL,                             \
1161         .th_version = 0,                                \
1162 }
1163
1164 #define out_handler mdt_handler
1165 static struct tgt_handler out_update_ops[] = {
1166         DEF_OUT_HNDL(OBJ_CREATE, "obj_create", MUTABOR | HABEO_REFERO,
1167                      out_create),
1168         DEF_OUT_HNDL(OBJ_DESTROY, "obj_create", MUTABOR | HABEO_REFERO,
1169                      out_destroy),
1170         DEF_OUT_HNDL(OBJ_REF_ADD, "obj_ref_add", MUTABOR | HABEO_REFERO,
1171                      out_ref_add),
1172         DEF_OUT_HNDL(OBJ_REF_DEL, "obj_ref_del", MUTABOR | HABEO_REFERO,
1173                      out_ref_del),
1174         DEF_OUT_HNDL(OBJ_ATTR_SET, "obj_attr_set",  MUTABOR | HABEO_REFERO,
1175                      out_attr_set),
1176         DEF_OUT_HNDL(OBJ_ATTR_GET, "obj_attr_get",  HABEO_REFERO,
1177                      out_attr_get),
1178         DEF_OUT_HNDL(OBJ_XATTR_SET, "obj_xattr_set", MUTABOR | HABEO_REFERO,
1179                      out_xattr_set),
1180         DEF_OUT_HNDL(OBJ_XATTR_GET, "obj_xattr_get", HABEO_REFERO,
1181                      out_xattr_get),
1182         DEF_OUT_HNDL(OBJ_INDEX_LOOKUP, "obj_index_lookup", HABEO_REFERO,
1183                      out_index_lookup),
1184         DEF_OUT_HNDL(OBJ_INDEX_INSERT, "obj_index_insert",
1185                      MUTABOR | HABEO_REFERO, out_index_insert),
1186         DEF_OUT_HNDL(OBJ_INDEX_DELETE, "obj_index_delete",
1187                      MUTABOR | HABEO_REFERO, out_index_delete),
1188 };
1189
1190 struct tgt_handler *out_handler_find(__u32 opc)
1191 {
1192         struct tgt_handler *h;
1193
1194         h = NULL;
1195         if (OBJ_CREATE <= opc && opc < OBJ_LAST) {
1196                 h = &out_update_ops[opc - OBJ_CREATE];
1197                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1198                          h->th_opc, opc);
1199         } else {
1200                 h = NULL; /* unsupported opc */
1201         }
1202         return h;
1203 }
1204
1205 /**
1206  * Object updates between Targets. Because all the updates has been
1207  * dis-assemblied into object updates at sender side, so OUT will
1208  * call OSD API directly to execute these updates.
1209  *
1210  * In DNE phase I all of the updates in the request need to be executed
1211  * in one transaction, and the transaction has to be synchronously.
1212  *
1213  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1214  * format.
1215  */
1216 int out_handle(struct tgt_session_info *tsi)
1217 {
1218         const struct lu_env             *env = tsi->tsi_env;
1219         struct tgt_thread_info          *tti = tgt_th_info(env);
1220         struct thandle_exec_args        *ta = &tti->tti_tea;
1221         struct req_capsule              *pill = tsi->tsi_pill;
1222         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1223         struct update_buf               *ubuf;
1224         struct update                   *update;
1225         struct update_reply             *update_reply;
1226         int                              bufsize;
1227         int                              count;
1228         int                              old_batchid = -1;
1229         unsigned                         off;
1230         int                              i;
1231         int                              rc = 0;
1232         int                              rc1 = 0;
1233
1234         ENTRY;
1235
1236         req_capsule_set(pill, &RQF_UPDATE_OBJ);
1237         bufsize = req_capsule_get_size(pill, &RMF_UPDATE, RCL_CLIENT);
1238         if (bufsize != UPDATE_BUFFER_SIZE) {
1239                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1240                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1241                 RETURN(err_serious(-EPROTO));
1242         }
1243
1244         ubuf = req_capsule_client_get(pill, &RMF_UPDATE);
1245         if (ubuf == NULL) {
1246                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1247                        -EPROTO);
1248                 RETURN(err_serious(-EPROTO));
1249         }
1250
1251         if (ubuf->ub_magic != UPDATE_BUFFER_MAGIC) {
1252                 CERROR("%s: invalid magic %x expect %x: rc = %d\n",
1253                        tgt_name(tsi->tsi_tgt), ubuf->ub_magic,
1254                        UPDATE_BUFFER_MAGIC, -EPROTO);
1255                 RETURN(err_serious(-EPROTO));
1256         }
1257
1258         count = ubuf->ub_count;
1259         if (count <= 0) {
1260                 CERROR("%s: No update!: rc = %d\n",
1261                        tgt_name(tsi->tsi_tgt), -EPROTO);
1262                 RETURN(err_serious(-EPROTO));
1263         }
1264
1265         req_capsule_set_size(pill, &RMF_UPDATE_REPLY, RCL_SERVER,
1266                              UPDATE_BUFFER_SIZE);
1267         rc = req_capsule_server_pack(pill);
1268         if (rc != 0) {
1269                 CERROR("%s: Can't pack response: rc = %d\n",
1270                        tgt_name(tsi->tsi_tgt), rc);
1271                 RETURN(rc);
1272         }
1273
1274         /* Prepare the update reply buffer */
1275         update_reply = req_capsule_server_get(pill, &RMF_UPDATE_REPLY);
1276         if (update_reply == NULL)
1277                 RETURN(err_serious(-EPROTO));
1278         update_init_reply_buf(update_reply, count);
1279         tti->tti_u.update.tti_update_reply = update_reply;
1280
1281         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1282         if (rc != 0)
1283                 RETURN(rc);
1284
1285         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1286
1287         /* Walk through updates in the request to execute them synchronously */
1288         off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
1289         for (i = 0; i < count; i++) {
1290                 struct tgt_handler      *h;
1291                 struct dt_object        *dt_obj;
1292
1293                 update = (struct update *)((char *)ubuf + off);
1294                 if (old_batchid == -1) {
1295                         old_batchid = update->u_batchid;
1296                 } else if (old_batchid != update->u_batchid) {
1297                         /* Stop the current update transaction,
1298                          * create a new one */
1299                         rc = out_tx_end(env, ta);
1300                         if (rc != 0)
1301                                 RETURN(rc);
1302
1303                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1304                         if (rc != 0)
1305                                 RETURN(rc);
1306                         old_batchid = update->u_batchid;
1307                 }
1308
1309                 fid_le_to_cpu(&update->u_fid, &update->u_fid);
1310                 if (!fid_is_sane(&update->u_fid)) {
1311                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1312                                tgt_name(tsi->tsi_tgt), PFID(&update->u_fid),
1313                                -EPROTO);
1314                         GOTO(out, rc = err_serious(-EPROTO));
1315                 }
1316
1317                 dt_obj = dt_locate(env, dt, &update->u_fid);
1318                 if (IS_ERR(dt_obj))
1319                         GOTO(out, rc = PTR_ERR(dt_obj));
1320
1321                 tti->tti_u.update.tti_dt_object = dt_obj;
1322                 tti->tti_u.update.tti_update = update;
1323                 tti->tti_u.update.tti_update_reply_index = i;
1324
1325                 h = out_handler_find(update->u_type);
1326                 if (likely(h != NULL)) {
1327                         /* For real modification RPC, check if the update
1328                          * has been executed */
1329                         if (h->th_flags & MUTABOR) {
1330                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1331
1332                                 if (out_check_resent(env, dt, dt_obj, req,
1333                                                      out_reconstruct,
1334                                                      update_reply, i))
1335                                         GOTO(next, rc);
1336                         }
1337
1338                         rc = h->th_act(tsi);
1339                 } else {
1340                         CERROR("%s: The unsupported opc: 0x%x\n",
1341                                tgt_name(tsi->tsi_tgt), update->u_type);
1342                         lu_object_put(env, &dt_obj->do_lu);
1343                         GOTO(out, rc = -ENOTSUPP);
1344                 }
1345 next:
1346                 lu_object_put(env, &dt_obj->do_lu);
1347                 if (rc < 0)
1348                         GOTO(out, rc);
1349                 off += cfs_size_round(update_size(update));
1350         }
1351 out:
1352         rc1 = out_tx_end(env, ta);
1353         if (rc == 0)
1354                 rc = rc1;
1355         RETURN(rc);
1356 }
1357
1358 struct tgt_handler tgt_out_handlers[] = {
1359 TGT_UPDATE_HDL(MUTABOR, UPDATE_OBJ,     out_handle),
1360 };
1361 EXPORT_SYMBOL(tgt_out_handlers);
1362