Whamcloud - gitweb
LU-5099 api: transfer object type via dt_insert API
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
40 {
41         struct tx_arg   **new_ta;
42         int             i;
43         int             rc = 0;
44
45         if (ta->ta_alloc_args >= new_alloc_ta)
46                 return 0;
47
48         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
49         if (new_ta == NULL)
50                 return -ENOMEM;
51
52         for (i = 0; i < new_alloc_ta; i++) {
53                 if (i < ta->ta_alloc_args) {
54                         /* copy the old args to new one */
55                         new_ta[i] = ta->ta_args[i];
56                 } else {
57                         OBD_ALLOC_PTR(new_ta[i]);
58                         if (new_ta[i] == NULL)
59                                 GOTO(out, rc = -ENOMEM);
60                 }
61         }
62
63         /* free the old args */
64         if (ta->ta_args != NULL)
65                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
66                                       ta->ta_alloc_args);
67
68         ta->ta_args = new_ta;
69         ta->ta_alloc_args = new_alloc_ta;
70 out:
71         if (rc != 0) {
72                 for (i = 0; i < new_alloc_ta; i++) {
73                         if (new_ta[i] != NULL)
74                                 OBD_FREE_PTR(new_ta[i]);
75                 }
76                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
77         }
78         return rc;
79 }
80
81 #define TX_ALLOC_STEP   8
82 static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
83                                   tx_exec_func_t func, tx_exec_func_t undo,
84                                   const char *file, int line)
85 {
86         int rc;
87         int i;
88
89         LASSERT(ta != NULL);
90         LASSERT(func != NULL);
91
92         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
93                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
94                 if (rc != 0)
95                         return ERR_PTR(rc);
96         }
97
98         i = ta->ta_argno;
99
100         ta->ta_argno++;
101
102         ta->ta_args[i]->exec_fn = func;
103         ta->ta_args[i]->undo_fn = undo;
104         ta->ta_args[i]->file    = file;
105         ta->ta_args[i]->line    = line;
106
107         return ta->ta_args[i];
108 }
109
110 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
111                             struct dt_object *obj,
112                             struct object_update_reply *reply,
113                             int index)
114 {
115         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
116                dt_obd_name(dt), reply, index, 0);
117
118         object_update_result_insert(reply, NULL, 0, index, 0);
119         return;
120 }
121
122 typedef void (*out_reconstruct_t)(const struct lu_env *env,
123                                   struct dt_device *dt,
124                                   struct dt_object *obj,
125                                   struct object_update_reply *reply,
126                                   int index);
127
128 static inline int out_check_resent(const struct lu_env *env,
129                                    struct dt_device *dt,
130                                    struct dt_object *obj,
131                                    struct ptlrpc_request *req,
132                                    out_reconstruct_t reconstruct,
133                                    struct object_update_reply *reply,
134                                    int index)
135 {
136         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
137                 return 0;
138
139         if (req_xid_is_last(req)) {
140                 reconstruct(env, dt, obj, reply, index);
141                 return 1;
142         }
143         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
144                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
145         return 0;
146 }
147
148 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
149                            struct thandle *th)
150 {
151         int rc;
152
153         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
154                PFID(lu_object_fid(&dt_obj->do_lu)));
155
156         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
157         rc = dt_destroy(env, dt_obj, th);
158         dt_write_unlock(env, dt_obj);
159
160         return rc;
161 }
162
163 /**
164  * All of the xxx_undo will be used once execution failed,
165  * But because all of the required resource has been reserved in
166  * declare phase, i.e. if declare succeed, it should make sure
167  * the following executing phase succeed in anyway, so these undo
168  * should be useless for most of the time in Phase I
169  */
170 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
171                        struct tx_arg *arg)
172 {
173         int rc;
174
175         rc = out_obj_destroy(env, arg->object, th);
176         if (rc != 0)
177                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
178                        dt_obd_name(th->th_dev), rc);
179         return rc;
180 }
181
182 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
183                        struct tx_arg *arg)
184 {
185         struct dt_object        *dt_obj = arg->object;
186         int                      rc;
187
188         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
189                dt_obd_name(th->th_dev),
190                PFID(lu_object_fid(&arg->object->do_lu)),
191                arg->u.create.dof.dof_type,
192                arg->u.create.attr.la_mode & S_IFMT);
193
194         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
195         rc = dt_create(env, dt_obj, &arg->u.create.attr,
196                        &arg->u.create.hint, &arg->u.create.dof, th);
197
198         dt_write_unlock(env, dt_obj);
199
200         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
201                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
202
203         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
204
205         return rc;
206 }
207
208 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
209                            struct lu_attr *attr, struct lu_fid *parent_fid,
210                            struct dt_object_format *dof,
211                            struct thandle_exec_args *ta,
212                            struct object_update_reply *reply,
213                            int index, const char *file, int line)
214 {
215         struct tx_arg *arg;
216         int rc;
217
218         LASSERT(ta->ta_handle != NULL);
219         rc = dt_declare_create(env, obj, attr, NULL, dof,
220                                        ta->ta_handle);
221         if (rc != 0)
222                 return rc;
223
224         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
225                           line);
226         if (IS_ERR(arg))
227                 return PTR_ERR(arg);
228
229         /* release the object in out_trans_stop */
230         lu_object_get(&obj->do_lu);
231         arg->object = obj;
232         arg->u.create.attr = *attr;
233         if (parent_fid != NULL)
234                 arg->u.create.fid = *parent_fid;
235         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
236         arg->u.create.dof  = *dof;
237         arg->reply = reply;
238         arg->index = index;
239
240         return 0;
241 }
242
243 static int out_create(struct tgt_session_info *tsi)
244 {
245         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
246         struct object_update    *update = tti->tti_u.update.tti_update;
247         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
248         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
249         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
250         struct lu_attr          *attr = &tti->tti_attr;
251         struct lu_fid           *fid = NULL;
252         struct obdo             *wobdo;
253         int                     size;
254         int                     rc;
255
256         ENTRY;
257
258         wobdo = object_update_param_get(update, 0, &size);
259         if (wobdo == NULL || size != sizeof(*wobdo)) {
260                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
261                        tgt_name(tsi->tsi_tgt), -EPROTO);
262                 RETURN(err_serious(-EPROTO));
263         }
264
265         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
266                 lustre_swab_obdo(wobdo);
267         lustre_get_wire_obdo(NULL, lobdo, wobdo);
268         la_from_obdo(attr, lobdo, lobdo->o_valid);
269
270         dof->dof_type = dt_mode_to_dft(attr->la_mode);
271         if (update->ou_params_count > 1) {
272                 int size;
273
274                 fid = object_update_param_get(update, 1, &size);
275                 if (fid == NULL || size != sizeof(*fid)) {
276                         CERROR("%s: invalid fid: rc = %d\n",
277                                tgt_name(tsi->tsi_tgt), -EPROTO);
278                         RETURN(err_serious(-EPROTO));
279                 }
280                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
281                         lustre_swab_lu_fid(fid);
282                 if (!fid_is_sane(fid)) {
283                         CERROR("%s: invalid fid "DFID": rc = %d\n",
284                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
285                         RETURN(err_serious(-EPROTO));
286                 }
287         }
288
289         if (lu_object_exists(&obj->do_lu))
290                 RETURN(-EEXIST);
291
292         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
293                            &tti->tti_tea,
294                            tti->tti_u.update.tti_update_reply,
295                            tti->tti_u.update.tti_update_reply_index);
296
297         RETURN(rc);
298 }
299
300 static int out_tx_attr_set_undo(const struct lu_env *env,
301                                 struct thandle *th, struct tx_arg *arg)
302 {
303         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
304                dt_obd_name(th->th_dev),
305                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
306
307         return -ENOTSUPP;
308 }
309
310 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
311                                 struct tx_arg *arg)
312 {
313         struct dt_object        *dt_obj = arg->object;
314         int                     rc;
315
316         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
317                PFID(lu_object_fid(&dt_obj->do_lu)));
318
319         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
320         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
321         dt_write_unlock(env, dt_obj);
322
323         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
324                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
325
326         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
327
328         return rc;
329 }
330
331 static int __out_tx_attr_set(const struct lu_env *env,
332                              struct dt_object *dt_obj,
333                              const struct lu_attr *attr,
334                              struct thandle_exec_args *th,
335                              struct object_update_reply *reply,
336                              int index, const char *file, int line)
337 {
338         struct tx_arg   *arg;
339         int             rc;
340
341         LASSERT(th->ta_handle != NULL);
342         rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
343         if (rc != 0)
344                 return rc;
345
346         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
347                           file, line);
348         if (IS_ERR(arg))
349                 return PTR_ERR(arg);
350
351         lu_object_get(&dt_obj->do_lu);
352         arg->object = dt_obj;
353         arg->u.attr_set.attr = *attr;
354         arg->reply = reply;
355         arg->index = index;
356         return 0;
357 }
358
359 static int out_attr_set(struct tgt_session_info *tsi)
360 {
361         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
362         struct object_update    *update = tti->tti_u.update.tti_update;
363         struct lu_attr          *attr = &tti->tti_attr;
364         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
365         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
366         struct obdo             *wobdo;
367         int                      size;
368         int                      rc;
369
370         ENTRY;
371
372         wobdo = object_update_param_get(update, 0, &size);
373         if (wobdo == NULL || size != sizeof(*wobdo)) {
374                 CERROR("%s: empty obdo in the update: rc = %d\n",
375                        tgt_name(tsi->tsi_tgt), -EPROTO);
376                 RETURN(err_serious(-EPROTO));
377         }
378
379         attr->la_valid = 0;
380         attr->la_valid = 0;
381
382         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
383                 lustre_swab_obdo(wobdo);
384         lustre_get_wire_obdo(NULL, lobdo, wobdo);
385         la_from_obdo(attr, lobdo, lobdo->o_valid);
386
387         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
388                              tti->tti_u.update.tti_update_reply,
389                              tti->tti_u.update.tti_update_reply_index);
390
391         RETURN(rc);
392 }
393
394 static int out_attr_get(struct tgt_session_info *tsi)
395 {
396         const struct lu_env     *env = tsi->tsi_env;
397         struct tgt_thread_info  *tti = tgt_th_info(env);
398         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
399         struct lu_attr          *la = &tti->tti_attr;
400         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
401         int                     idx = tti->tti_u.update.tti_update_reply_index;
402         int                     rc;
403
404         ENTRY;
405
406         if (!lu_object_exists(&obj->do_lu)) {
407                 /* Usually, this will be called when the master MDT try
408                  * to init a remote object(see osp_object_init), so if
409                  * the object does not exist on slave, we need set BANSHEE flag,
410                  * so the object can be removed from the cache immediately */
411                 set_bit(LU_OBJECT_HEARD_BANSHEE,
412                         &obj->do_lu.lo_header->loh_flags);
413                 RETURN(-ENOENT);
414         }
415
416         dt_read_lock(env, obj, MOR_TGT_CHILD);
417         rc = dt_attr_get(env, obj, la, NULL);
418         if (rc)
419                 GOTO(out_unlock, rc);
420
421         obdo->o_valid = 0;
422         obdo_from_la(obdo, la, la->la_valid);
423         lustre_set_wire_obdo(NULL, obdo, obdo);
424
425 out_unlock:
426         dt_read_unlock(env, obj);
427
428         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
429                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
430                0, rc);
431
432         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
433                                     sizeof(*obdo), idx, rc);
434
435         RETURN(rc);
436 }
437
438 static int out_xattr_get(struct tgt_session_info *tsi)
439 {
440         const struct lu_env        *env = tsi->tsi_env;
441         struct tgt_thread_info     *tti = tgt_th_info(env);
442         struct object_update       *update = tti->tti_u.update.tti_update;
443         struct lu_buf              *lbuf = &tti->tti_buf;
444         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
445         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
446         char                       *name;
447         struct object_update_result *update_result;
448         int                     idx = tti->tti_u.update.tti_update_reply_index;
449         int                        rc;
450
451         ENTRY;
452
453         if (!lu_object_exists(&obj->do_lu)) {
454                 set_bit(LU_OBJECT_HEARD_BANSHEE,
455                         &obj->do_lu.lo_header->loh_flags);
456                 RETURN(-ENOENT);
457         }
458
459         name = object_update_param_get(update, 0, NULL);
460         if (name == NULL) {
461                 CERROR("%s: empty name for xattr get: rc = %d\n",
462                        tgt_name(tsi->tsi_tgt), -EPROTO);
463                 RETURN(err_serious(-EPROTO));
464         }
465
466         update_result = object_update_result_get(reply, 0, NULL);
467         if (update_result == NULL) {
468                 CERROR("%s: empty name for xattr get: rc = %d\n",
469                        tgt_name(tsi->tsi_tgt), -EPROTO);
470                 RETURN(err_serious(-EPROTO));
471         }
472
473         lbuf->lb_buf = update_result->our_data;
474         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
475                        cfs_size_round((unsigned long)update_result->our_data -
476                                       (unsigned long)update_result);
477         dt_read_lock(env, obj, MOR_TGT_CHILD);
478         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
479         dt_read_unlock(env, obj);
480         if (rc < 0) {
481                 lbuf->lb_len = 0;
482                 GOTO(out, rc);
483         }
484         if (rc == 0) {
485                 lbuf->lb_len = 0;
486                 GOTO(out, rc = -ENOENT);
487         }
488         lbuf->lb_len = rc;
489         rc = 0;
490         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
491                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
492                name, (int)lbuf->lb_len);
493
494         GOTO(out, rc);
495
496 out:
497         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
498         RETURN(rc);
499 }
500
501 static int out_index_lookup(struct tgt_session_info *tsi)
502 {
503         const struct lu_env     *env = tsi->tsi_env;
504         struct tgt_thread_info  *tti = tgt_th_info(env);
505         struct object_update    *update = tti->tti_u.update.tti_update;
506         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
507         char                    *name;
508         int                      rc;
509
510         ENTRY;
511
512         if (!lu_object_exists(&obj->do_lu))
513                 RETURN(-ENOENT);
514
515         name = object_update_param_get(update, 0, NULL);
516         if (name == NULL) {
517                 CERROR("%s: empty name for lookup: rc = %d\n",
518                        tgt_name(tsi->tsi_tgt), -EPROTO);
519                 RETURN(err_serious(-EPROTO));
520         }
521
522         dt_read_lock(env, obj, MOR_TGT_CHILD);
523         if (!dt_try_as_dir(env, obj))
524                 GOTO(out_unlock, rc = -ENOTDIR);
525
526         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
527                 (struct dt_key *)name, NULL);
528
529         if (rc < 0)
530                 GOTO(out_unlock, rc);
531
532         if (rc == 0)
533                 rc += 1;
534
535 out_unlock:
536         dt_read_unlock(env, obj);
537
538         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
539                PFID(lu_object_fid(&obj->do_lu)), name,
540                PFID(&tti->tti_fid1), rc);
541
542         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
543                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
544                0, rc);
545
546         object_update_result_insert(tti->tti_u.update.tti_update_reply,
547                             &tti->tti_fid1, sizeof(tti->tti_fid1),
548                             tti->tti_u.update.tti_update_reply_index, rc);
549         RETURN(rc);
550 }
551
552 static int out_tx_xattr_set_exec(const struct lu_env *env,
553                                  struct thandle *th,
554                                  struct tx_arg *arg)
555 {
556         struct dt_object *dt_obj = arg->object;
557         int rc;
558
559         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
560                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
561                arg->u.xattr_set.name, arg->u.xattr_set.flags);
562
563         if (!lu_object_exists(&dt_obj->do_lu))
564                 GOTO(out, rc = -ENOENT);
565
566         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
567         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
568                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
569                           th, NULL);
570         dt_write_unlock(env, dt_obj);
571         /**
572          * Ignore errors if this is LINK EA
573          **/
574         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
575                 rc = 0;
576 out:
577         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
578                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
579
580         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
581
582         return rc;
583 }
584
585 static int __out_tx_xattr_set(const struct lu_env *env,
586                               struct dt_object *dt_obj,
587                               const struct lu_buf *buf,
588                               const char *name, int flags,
589                               struct thandle_exec_args *ta,
590                               struct object_update_reply *reply,
591                               int index, const char *file, int line)
592 {
593         struct tx_arg   *arg;
594         int             rc;
595
596         LASSERT(ta->ta_handle != NULL);
597         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle);
598         if (rc != 0)
599                 return rc;
600
601         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
602         if (IS_ERR(arg))
603                 return PTR_ERR(arg);
604
605         lu_object_get(&dt_obj->do_lu);
606         arg->object = dt_obj;
607         arg->u.xattr_set.name = name;
608         arg->u.xattr_set.flags = flags;
609         arg->u.xattr_set.buf = *buf;
610         arg->reply = reply;
611         arg->index = index;
612         arg->u.xattr_set.csum = 0;
613         return 0;
614 }
615
616 static int out_xattr_set(struct tgt_session_info *tsi)
617 {
618         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
619         struct object_update    *update = tti->tti_u.update.tti_update;
620         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
621         struct lu_buf           *lbuf = &tti->tti_buf;
622         char                    *name;
623         char                    *buf;
624         __u32                   *tmp;
625         int                      buf_len = 0;
626         int                      flag;
627         int                      size = 0;
628         int                      rc;
629         ENTRY;
630
631         name = object_update_param_get(update, 0, NULL);
632         if (name == NULL) {
633                 CERROR("%s: empty name for xattr set: rc = %d\n",
634                        tgt_name(tsi->tsi_tgt), -EPROTO);
635                 RETURN(err_serious(-EPROTO));
636         }
637
638         buf = object_update_param_get(update, 1, &buf_len);
639         if (buf == NULL || buf_len == 0) {
640                 CERROR("%s: empty buf for xattr set: rc = %d\n",
641                        tgt_name(tsi->tsi_tgt), -EPROTO);
642                 RETURN(err_serious(-EPROTO));
643         }
644
645         lbuf->lb_buf = buf;
646         lbuf->lb_len = buf_len;
647
648         tmp = object_update_param_get(update, 2, &size);
649         if (tmp == NULL || size != sizeof(*tmp)) {
650                 CERROR("%s: emptry or wrong size %d flag: rc = %d\n",
651                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
652                 RETURN(err_serious(-EPROTO));
653         }
654
655         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
656                 __swab32s(tmp);
657         flag = *tmp;
658
659         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
660                               &tti->tti_tea,
661                               tti->tti_u.update.tti_update_reply,
662                               tti->tti_u.update.tti_update_reply_index);
663         RETURN(rc);
664 }
665
666 static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th,
667                                  struct tx_arg *arg)
668 {
669         struct dt_object *dt_obj = arg->object;
670         int rc;
671
672         CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n",
673                dt_obd_name(th->th_dev), arg->u.xattr_set.name,
674                PFID(lu_object_fid(&dt_obj->do_lu)));
675
676         if (!lu_object_exists(&dt_obj->do_lu))
677                 GOTO(out, rc = -ENOENT);
678
679         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
680         rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name,
681                           th, NULL);
682         dt_write_unlock(env, dt_obj);
683 out:
684         CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n",
685                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
686
687         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
688
689         return rc;
690 }
691
692 static int __out_tx_xattr_del(const struct lu_env *env,
693                               struct dt_object *dt_obj, const char *name,
694                               struct thandle_exec_args *ta,
695                               struct object_update_reply *reply,
696                               int index, const char *file, int line)
697 {
698         struct tx_arg   *arg;
699         int             rc;
700
701         rc = dt_declare_xattr_del(env, dt_obj, name, ta->ta_handle);
702         if (rc != 0)
703                 return rc;
704
705         arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line);
706         if (IS_ERR(arg))
707                 return PTR_ERR(arg);
708
709         lu_object_get(&dt_obj->do_lu);
710         arg->object = dt_obj;
711         arg->u.xattr_set.name = name;
712         arg->reply = reply;
713         arg->index = index;
714         return 0;
715 }
716
717 static int out_xattr_del(struct tgt_session_info *tsi)
718 {
719         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
720         struct object_update    *update = tti->tti_u.update.tti_update;
721         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
722         char                    *name;
723         int                      rc;
724         ENTRY;
725
726         name = object_update_param_get(update, 0, NULL);
727         if (name == NULL) {
728                 CERROR("%s: empty name for xattr set: rc = %d\n",
729                        tgt_name(tsi->tsi_tgt), -EPROTO);
730                 RETURN(err_serious(-EPROTO));
731         }
732
733         rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea,
734                               tti->tti_u.update.tti_update_reply,
735                               tti->tti_u.update.tti_update_reply_index);
736         RETURN(rc);
737 }
738
739 static int out_obj_ref_add(const struct lu_env *env,
740                            struct dt_object *dt_obj,
741                            struct thandle *th)
742 {
743         int rc;
744
745         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
746         rc = dt_ref_add(env, dt_obj, th);
747         dt_write_unlock(env, dt_obj);
748
749         return rc;
750 }
751
752 static int out_obj_ref_del(const struct lu_env *env,
753                            struct dt_object *dt_obj,
754                            struct thandle *th)
755 {
756         int rc;
757
758         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
759         rc = dt_ref_del(env, dt_obj, th);
760         dt_write_unlock(env, dt_obj);
761
762         return rc;
763 }
764
765 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
766                                struct tx_arg *arg)
767 {
768         struct dt_object *dt_obj = arg->object;
769         int rc;
770
771         rc = out_obj_ref_add(env, dt_obj, th);
772
773         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
774                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
775
776         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
777         return rc;
778 }
779
780 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
781                                struct tx_arg *arg)
782 {
783         return out_obj_ref_del(env, arg->object, th);
784 }
785
786 static int __out_tx_ref_add(const struct lu_env *env,
787                             struct dt_object *dt_obj,
788                             struct thandle_exec_args *ta,
789                             struct object_update_reply *reply,
790                             int index, const char *file, int line)
791 {
792         struct tx_arg   *arg;
793         int             rc;
794
795         LASSERT(ta->ta_handle != NULL);
796         rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
797         if (rc != 0)
798                 return rc;
799
800         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
801                           line);
802         if (IS_ERR(arg))
803                 return PTR_ERR(arg);
804
805         lu_object_get(&dt_obj->do_lu);
806         arg->object = dt_obj;
807         arg->reply = reply;
808         arg->index = index;
809         return 0;
810 }
811
812 /**
813  * increase ref of the object
814  **/
815 static int out_ref_add(struct tgt_session_info *tsi)
816 {
817         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
818         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
819         int                      rc;
820
821         ENTRY;
822
823         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
824                             tti->tti_u.update.tti_update_reply,
825                             tti->tti_u.update.tti_update_reply_index);
826         RETURN(rc);
827 }
828
829 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
830                                struct tx_arg *arg)
831 {
832         struct dt_object        *dt_obj = arg->object;
833         int                      rc;
834
835         rc = out_obj_ref_del(env, dt_obj, th);
836
837         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
838                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
839
840         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
841
842         return rc;
843 }
844
845 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
846                                struct tx_arg *arg)
847 {
848         return out_obj_ref_add(env, arg->object, th);
849 }
850
851 static int __out_tx_ref_del(const struct lu_env *env,
852                             struct dt_object *dt_obj,
853                             struct thandle_exec_args *ta,
854                             struct object_update_reply *reply,
855                             int index, const char *file, int line)
856 {
857         struct tx_arg   *arg;
858         int             rc;
859
860         LASSERT(ta->ta_handle != NULL);
861         rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
862         if (rc != 0)
863                 return rc;
864
865         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
866                           line);
867         if (IS_ERR(arg))
868                 return PTR_ERR(arg);
869
870         lu_object_get(&dt_obj->do_lu);
871         arg->object = dt_obj;
872         arg->reply = reply;
873         arg->index = index;
874         return 0;
875 }
876
877 static int out_ref_del(struct tgt_session_info *tsi)
878 {
879         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
880         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
881         int                      rc;
882
883         ENTRY;
884
885         if (!lu_object_exists(&obj->do_lu))
886                 RETURN(-ENOENT);
887
888         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
889                             tti->tti_u.update.tti_update_reply,
890                             tti->tti_u.update.tti_update_reply_index);
891         RETURN(rc);
892 }
893
894 static int out_obj_index_insert(const struct lu_env *env,
895                                 struct dt_object *dt_obj,
896                                 const struct dt_rec *rec,
897                                 const struct dt_key *key,
898                                 struct thandle *th)
899 {
900         int rc;
901
902         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n",
903                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
904                (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid),
905                ((struct dt_insert_rec *)rec)->rec_type);
906
907         if (dt_try_as_dir(env, dt_obj) == 0)
908                 return -ENOTDIR;
909
910         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
911         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
912         dt_write_unlock(env, dt_obj);
913
914         return rc;
915 }
916
917 static int out_obj_index_delete(const struct lu_env *env,
918                                 struct dt_object *dt_obj,
919                                 const struct dt_key *key,
920                                 struct thandle *th)
921 {
922         int rc;
923
924         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
925                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
926                (char *)key);
927
928         if (dt_try_as_dir(env, dt_obj) == 0)
929                 return -ENOTDIR;
930
931         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
932         rc = dt_delete(env, dt_obj, key, th, NULL);
933         dt_write_unlock(env, dt_obj);
934
935         return rc;
936 }
937
938 static int out_tx_index_insert_exec(const struct lu_env *env,
939                                     struct thandle *th, struct tx_arg *arg)
940 {
941         struct dt_object *dt_obj = arg->object;
942         int rc;
943
944         rc = out_obj_index_insert(env, dt_obj,
945                                   (const struct dt_rec *)&arg->u.insert.rec,
946                                   arg->u.insert.key, th);
947
948         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
949                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
950
951         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
952
953         return rc;
954 }
955
956 static int out_tx_index_insert_undo(const struct lu_env *env,
957                                     struct thandle *th, struct tx_arg *arg)
958 {
959         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
960 }
961
962 static int __out_tx_index_insert(const struct lu_env *env,
963                                  struct dt_object *dt_obj,
964                                  const struct dt_rec *rec,
965                                  const struct dt_key *key,
966                                  struct thandle_exec_args *ta,
967                                  struct object_update_reply *reply,
968                                  int index, const char *file, int line)
969 {
970         struct tx_arg   *arg;
971         int             rc;
972
973         LASSERT(ta->ta_handle != NULL);
974         if (dt_try_as_dir(env, dt_obj) == 0) {
975                 rc = -ENOTDIR;
976                 return rc;
977         }
978
979         rc = dt_declare_insert(env, dt_obj, rec, key, ta->ta_handle);
980         if (rc != 0)
981                 return rc;
982
983         arg = tx_add_exec(ta, out_tx_index_insert_exec,
984                           out_tx_index_insert_undo, file, line);
985         if (IS_ERR(arg))
986                 return PTR_ERR(arg);
987
988         lu_object_get(&dt_obj->do_lu);
989         arg->object = dt_obj;
990         arg->reply = reply;
991         arg->index = index;
992         arg->u.insert.rec = *(const struct dt_insert_rec *)rec;
993         arg->u.insert.key = key;
994
995         return 0;
996 }
997
998 static int out_index_insert(struct tgt_session_info *tsi)
999 {
1000         struct tgt_thread_info  *tti    = tgt_th_info(tsi->tsi_env);
1001         struct object_update    *update = tti->tti_u.update.tti_update;
1002         struct dt_object        *obj    = tti->tti_u.update.tti_dt_object;
1003         struct dt_insert_rec    *rec    = &tti->tti_rec;
1004         struct lu_fid           *fid;
1005         char                    *name;
1006         __u32                   *ptype;
1007         int                      rc     = 0;
1008         int                      size;
1009         ENTRY;
1010
1011         name = object_update_param_get(update, 0, NULL);
1012         if (name == NULL) {
1013                 CERROR("%s: empty name for index insert: rc = %d\n",
1014                        tgt_name(tsi->tsi_tgt), -EPROTO);
1015                 RETURN(err_serious(-EPROTO));
1016         }
1017
1018         fid = object_update_param_get(update, 1, &size);
1019         if (fid == NULL || size != sizeof(*fid)) {
1020                 CERROR("%s: invalid fid: rc = %d\n",
1021                        tgt_name(tsi->tsi_tgt), -EPROTO);
1022                        RETURN(err_serious(-EPROTO));
1023         }
1024
1025         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1026                 lustre_swab_lu_fid(fid);
1027
1028         if (!fid_is_sane(fid)) {
1029                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1030                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1031                 RETURN(err_serious(-EPROTO));
1032         }
1033
1034         ptype = object_update_param_get(update, 2, &size);
1035         if (ptype == NULL || size != sizeof(*ptype)) {
1036                 CERROR("%s: invalid type for index insert: rc = %d\n",
1037                        tgt_name(tsi->tsi_tgt), -EPROTO);
1038                 RETURN(err_serious(-EPROTO));
1039         }
1040
1041         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1042                 __swab32s(ptype);
1043
1044         rec->rec_fid = fid;
1045         rec->rec_type = *ptype;
1046
1047         rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec,
1048                                  (const struct dt_key *)name, &tti->tti_tea,
1049                                  tti->tti_u.update.tti_update_reply,
1050                                  tti->tti_u.update.tti_update_reply_index);
1051         RETURN(rc);
1052 }
1053
1054 static int out_tx_index_delete_exec(const struct lu_env *env,
1055                                     struct thandle *th,
1056                                     struct tx_arg *arg)
1057 {
1058         int rc;
1059
1060         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1061
1062         CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n",
1063                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1064
1065         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1066
1067         return rc;
1068 }
1069
1070 static int out_tx_index_delete_undo(const struct lu_env *env,
1071                                     struct thandle *th,
1072                                     struct tx_arg *arg)
1073 {
1074         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1075                dt_obd_name(th->th_dev), -ENOTSUPP);
1076         return -ENOTSUPP;
1077 }
1078
1079 static int __out_tx_index_delete(const struct lu_env *env,
1080                                  struct dt_object *dt_obj,
1081                                  const struct dt_key *key,
1082                                  struct thandle_exec_args *ta,
1083                                  struct object_update_reply *reply,
1084                                  int index, const char *file, int line)
1085 {
1086         struct tx_arg   *arg;
1087         int             rc;
1088
1089         if (dt_try_as_dir(env, dt_obj) == 0) {
1090                 rc = -ENOTDIR;
1091                 return rc;
1092         }
1093
1094         LASSERT(ta->ta_handle != NULL);
1095         rc = dt_declare_delete(env, dt_obj, key, ta->ta_handle);
1096         if (rc != 0)
1097                 return rc;
1098
1099         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1100                           out_tx_index_delete_undo, file, line);
1101         if (IS_ERR(arg))
1102                 return PTR_ERR(arg);
1103
1104         lu_object_get(&dt_obj->do_lu);
1105         arg->object = dt_obj;
1106         arg->reply = reply;
1107         arg->index = index;
1108         arg->u.insert.key = key;
1109         return 0;
1110 }
1111
1112 static int out_index_delete(struct tgt_session_info *tsi)
1113 {
1114         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1115         struct object_update    *update = tti->tti_u.update.tti_update;
1116         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1117         char                    *name;
1118         int                      rc = 0;
1119
1120         if (!lu_object_exists(&obj->do_lu))
1121                 RETURN(-ENOENT);
1122
1123         name = object_update_param_get(update, 0, NULL);
1124         if (name == NULL) {
1125                 CERROR("%s: empty name for index delete: rc = %d\n",
1126                        tgt_name(tsi->tsi_tgt), -EPROTO);
1127                 RETURN(err_serious(-EPROTO));
1128         }
1129
1130         rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name,
1131                                  &tti->tti_tea,
1132                                  tti->tti_u.update.tti_update_reply,
1133                                  tti->tti_u.update.tti_update_reply_index);
1134         RETURN(rc);
1135 }
1136
1137 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1138                                struct tx_arg *arg)
1139 {
1140         struct dt_object *dt_obj = arg->object;
1141         int rc;
1142
1143         rc = out_obj_destroy(env, dt_obj, th);
1144
1145         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1146                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1147
1148         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1149
1150         RETURN(rc);
1151 }
1152
1153 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1154                                struct tx_arg *arg)
1155 {
1156         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1157                dt_obd_name(th->th_dev), -ENOTSUPP);
1158         return -ENOTSUPP;
1159 }
1160
1161 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1162                              struct thandle_exec_args *ta,
1163                              struct object_update_reply *reply,
1164                              int index, const char *file, int line)
1165 {
1166         struct tx_arg   *arg;
1167         int             rc;
1168
1169         LASSERT(ta->ta_handle != NULL);
1170         rc = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1171         if (rc != 0)
1172                 return rc;
1173
1174         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1175                           file, line);
1176         if (IS_ERR(arg))
1177                 return PTR_ERR(arg);
1178
1179         lu_object_get(&dt_obj->do_lu);
1180         arg->object = dt_obj;
1181         arg->reply = reply;
1182         arg->index = index;
1183         return 0;
1184 }
1185
1186 static int out_destroy(struct tgt_session_info *tsi)
1187 {
1188         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1189         struct object_update    *update = tti->tti_u.update.tti_update;
1190         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1191         struct lu_fid           *fid;
1192         int                      rc;
1193         ENTRY;
1194
1195         fid = &update->ou_fid;
1196         if (!fid_is_sane(fid)) {
1197                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1198                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1199                 RETURN(err_serious(-EPROTO));
1200         }
1201
1202         if (!lu_object_exists(&obj->do_lu))
1203                 RETURN(-ENOENT);
1204
1205         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1206                             tti->tti_u.update.tti_update_reply,
1207                             tti->tti_u.update.tti_update_reply_index);
1208
1209         RETURN(rc);
1210 }
1211
1212 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1213                              struct tx_arg *arg)
1214 {
1215         struct dt_object *dt_obj = arg->object;
1216         int rc;
1217
1218         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1219         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1220                              &arg->u.write.pos, th);
1221         dt_write_unlock(env, dt_obj);
1222
1223         if (rc == 0)
1224                 rc = arg->u.write.buf.lb_len;
1225
1226         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1227
1228         return rc > 0 ? 0 : rc;
1229 }
1230
1231 static int __out_tx_write(const struct lu_env *env,
1232                           struct dt_object *dt_obj,
1233                           const struct lu_buf *buf,
1234                           loff_t pos, struct thandle_exec_args *ta,
1235                           struct object_update_reply *reply,
1236                           int index, const char *file, int line)
1237 {
1238         struct tx_arg   *arg;
1239         int             rc;
1240
1241         LASSERT(ta->ta_handle != NULL);
1242         rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle);
1243         if (rc != 0)
1244                 return rc;
1245
1246         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1247         if (IS_ERR(arg))
1248                 return PTR_ERR(arg);
1249
1250         lu_object_get(&dt_obj->do_lu);
1251         arg->object = dt_obj;
1252         arg->u.write.buf = *buf;
1253         arg->u.write.pos = pos;
1254         arg->reply = reply;
1255         arg->index = index;
1256         return 0;
1257 }
1258
1259 static int out_write(struct tgt_session_info *tsi)
1260 {
1261         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1262         struct object_update    *update = tti->tti_u.update.tti_update;
1263         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1264         struct lu_buf           *lbuf = &tti->tti_buf;
1265         char                    *buf;
1266         __u64                   *tmp;
1267         int                     size = 0;
1268         int                     buf_len = 0;
1269         loff_t                  pos;
1270         int                      rc;
1271         ENTRY;
1272
1273         buf = object_update_param_get(update, 0, &buf_len);
1274         if (buf == NULL || buf_len == 0) {
1275                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1276                        tgt_name(tsi->tsi_tgt), -EPROTO);
1277                 RETURN(err_serious(-EPROTO));
1278         }
1279         lbuf->lb_buf = buf;
1280         lbuf->lb_len = buf_len;
1281
1282         tmp = object_update_param_get(update, 1, &size);
1283         if (tmp == NULL || size != sizeof(*tmp)) {
1284                 CERROR("%s: empty or wrong size %d pos: rc = %d\n",
1285                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
1286                 RETURN(err_serious(-EPROTO));
1287         }
1288
1289         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1290                 __swab64s(tmp);
1291         pos = *tmp;
1292
1293         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1294                           &tti->tti_tea,
1295                           tti->tti_u.update.tti_update_reply,
1296                           tti->tti_u.update.tti_update_reply_index);
1297         RETURN(rc);
1298 }
1299
1300 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1301 [opc - OUT_CREATE] = {                                  \
1302         .th_name    = name,                             \
1303         .th_fail_id = 0,                                \
1304         .th_opc     = opc,                              \
1305         .th_flags   = flags,                            \
1306         .th_act     = fn,                               \
1307         .th_fmt     = NULL,                             \
1308         .th_version = 0,                                \
1309 }
1310
1311 static struct tgt_handler out_update_ops[] = {
1312         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1313                      out_create),
1314         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1315                      out_destroy),
1316         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1317                      out_ref_add),
1318         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1319                      out_ref_del),
1320         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1321                      out_attr_set),
1322         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1323                      out_attr_get),
1324         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1325                      out_xattr_set),
1326         DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO,
1327                      out_xattr_del),
1328         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1329                      out_xattr_get),
1330         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1331                      out_index_lookup),
1332         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1333                      MUTABOR | HABEO_REFERO, out_index_insert),
1334         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1335                      MUTABOR | HABEO_REFERO, out_index_delete),
1336         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1337 };
1338
1339 struct tgt_handler *out_handler_find(__u32 opc)
1340 {
1341         struct tgt_handler *h;
1342
1343         h = NULL;
1344         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1345                 h = &out_update_ops[opc - OUT_CREATE];
1346                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1347                          h->th_opc, opc);
1348         } else {
1349                 h = NULL; /* unsupported opc */
1350         }
1351         return h;
1352 }
1353
1354 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1355                         struct thandle_exec_args *ta, struct obd_export *exp)
1356 {
1357         ta->ta_argno = 0;
1358         ta->ta_handle = dt_trans_create(env, dt);
1359         if (IS_ERR(ta->ta_handle)) {
1360                 int rc;
1361
1362                 rc = PTR_ERR(ta->ta_handle);
1363                 ta->ta_handle = NULL;
1364                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
1365                        rc);
1366                 return rc;
1367         }
1368         if (exp->exp_need_sync)
1369                 ta->ta_handle->th_sync = 1;
1370
1371         return 0;
1372 }
1373
1374 static int out_trans_start(const struct lu_env *env,
1375                            struct thandle_exec_args *ta)
1376 {
1377         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
1378 }
1379
1380 static int out_trans_stop(const struct lu_env *env,
1381                           struct thandle_exec_args *ta, int err)
1382 {
1383         int i;
1384         int rc;
1385
1386         ta->ta_handle->th_result = err;
1387         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
1388         for (i = 0; i < ta->ta_argno; i++) {
1389                 if (ta->ta_args[i]->object != NULL) {
1390                         struct dt_object *obj = ta->ta_args[i]->object;
1391
1392                         /* If the object is being created during this
1393                          * transaction, we need to remove them from the
1394                          * cache immediately, because a few layers are
1395                          * missing in OUT handler, i.e. the object might
1396                          * not be initialized in all layers */
1397                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
1398                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1399                                         &obj->do_lu.lo_header->loh_flags);
1400                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1401                         ta->ta_args[i]->object = NULL;
1402                 }
1403         }
1404         ta->ta_handle = NULL;
1405         ta->ta_argno = 0;
1406
1407         return rc;
1408 }
1409
1410 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
1411                int declare_ret)
1412 {
1413         struct tgt_session_info *tsi = tgt_ses_info(env);
1414         int                     i;
1415         int                     rc;
1416         int                     rc1;
1417         ENTRY;
1418
1419         if (ta->ta_handle == NULL)
1420                 RETURN(0);
1421
1422         if (declare_ret != 0 || ta->ta_argno == 0)
1423                 GOTO(stop, rc = declare_ret);
1424
1425         LASSERT(ta->ta_handle->th_dev != NULL);
1426         rc = out_trans_start(env, ta);
1427         if (unlikely(rc != 0))
1428                 GOTO(stop, rc);
1429
1430         for (i = 0; i < ta->ta_argno; i++) {
1431                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
1432                                              ta->ta_args[i]);
1433                 if (unlikely(rc != 0)) {
1434                         CDEBUG(D_INFO, "error during execution of #%u from"
1435                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1436                                ta->ta_args[i]->line, rc);
1437                         while (--i >= 0) {
1438                                 if (ta->ta_args[i]->undo_fn != NULL)
1439                                         ta->ta_args[i]->undo_fn(env,
1440                                                                ta->ta_handle,
1441                                                                ta->ta_args[i]);
1442                                 else
1443                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1444                                              dt_obd_name(ta->ta_handle->th_dev),
1445                                                ta->ta_args[i]->file,
1446                                                ta->ta_args[i]->line, -ENOTSUPP);
1447                         }
1448                         break;
1449                 }
1450                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1451                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
1452         }
1453
1454         /* Only fail for real update */
1455         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1456 stop:
1457         rc1 = out_trans_stop(env, ta, rc);
1458         if (rc == 0)
1459                 rc = rc1;
1460
1461         ta->ta_handle = NULL;
1462         ta->ta_argno = 0;
1463
1464         RETURN(rc);
1465 }
1466
1467 /**
1468  * Object updates between Targets. Because all the updates has been
1469  * dis-assemblied into object updates at sender side, so OUT will
1470  * call OSD API directly to execute these updates.
1471  *
1472  * In DNE phase I all of the updates in the request need to be executed
1473  * in one transaction, and the transaction has to be synchronously.
1474  *
1475  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1476  * format.
1477  */
1478 int out_handle(struct tgt_session_info *tsi)
1479 {
1480         const struct lu_env             *env = tsi->tsi_env;
1481         struct tgt_thread_info          *tti = tgt_th_info(env);
1482         struct thandle_exec_args        *ta = &tti->tti_tea;
1483         struct req_capsule              *pill = tsi->tsi_pill;
1484         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1485         struct object_update_request    *ureq;
1486         struct object_update            *update;
1487         struct object_update_reply      *reply;
1488         int                              bufsize;
1489         int                              count;
1490         int                              current_batchid = -1;
1491         int                              i;
1492         int                              rc = 0;
1493         int                              rc1 = 0;
1494
1495         ENTRY;
1496
1497         req_capsule_set(pill, &RQF_OUT_UPDATE);
1498         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1499         if (ureq == NULL) {
1500                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1501                        -EPROTO);
1502                 RETURN(err_serious(-EPROTO));
1503         }
1504
1505         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1506         if (bufsize != object_update_request_size(ureq)) {
1507                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1508                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1509                 RETURN(err_serious(-EPROTO));
1510         }
1511
1512         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1513                 CERROR("%s: invalid update buffer magic %x expect %x: "
1514                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1515                        UPDATE_REQUEST_MAGIC, -EPROTO);
1516                 RETURN(err_serious(-EPROTO));
1517         }
1518
1519         count = ureq->ourq_count;
1520         if (count <= 0) {
1521                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1522                        -EPROTO);
1523                 RETURN(err_serious(-EPROTO));
1524         }
1525
1526         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1527                              OUT_UPDATE_REPLY_SIZE);
1528         rc = req_capsule_server_pack(pill);
1529         if (rc != 0) {
1530                 CERROR("%s: Can't pack response: rc = %d\n",
1531                        tgt_name(tsi->tsi_tgt), rc);
1532                 RETURN(rc);
1533         }
1534
1535         /* Prepare the update reply buffer */
1536         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1537         if (reply == NULL)
1538                 RETURN(err_serious(-EPROTO));
1539         object_update_reply_init(reply, count);
1540         tti->tti_u.update.tti_update_reply = reply;
1541         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1542
1543         /* Walk through updates in the request to execute them synchronously */
1544         for (i = 0; i < count; i++) {
1545                 struct tgt_handler      *h;
1546                 struct dt_object        *dt_obj;
1547
1548                 update = object_update_request_get(ureq, i, NULL);
1549                 if (update == NULL)
1550                         GOTO(out, rc = -EPROTO);
1551
1552                 if (ptlrpc_req_need_swab(pill->rc_req))
1553                         lustre_swab_object_update(update);
1554
1555                 if (!fid_is_sane(&update->ou_fid)) {
1556                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1557                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1558                                -EPROTO);
1559                         GOTO(out, rc = err_serious(-EPROTO));
1560                 }
1561
1562                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1563                 if (IS_ERR(dt_obj))
1564                         GOTO(out, rc = PTR_ERR(dt_obj));
1565
1566                 if (dt->dd_record_fid_accessed) {
1567                         lfsck_pack_rfa(&tti->tti_lr,
1568                                        lu_object_fid(&dt_obj->do_lu));
1569                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1570                 }
1571
1572                 tti->tti_u.update.tti_dt_object = dt_obj;
1573                 tti->tti_u.update.tti_update = update;
1574                 tti->tti_u.update.tti_update_reply_index = i;
1575
1576                 h = out_handler_find(update->ou_type);
1577                 if (unlikely(h == NULL)) {
1578                         CERROR("%s: unsupported opc: 0x%x\n",
1579                                tgt_name(tsi->tsi_tgt), update->ou_type);
1580                         GOTO(next, rc = -ENOTSUPP);
1581                 }
1582
1583                 /* Check resend case only for modifying RPC */
1584                 if (h->th_flags & MUTABOR) {
1585                         struct ptlrpc_request *req = tgt_ses_req(tsi);
1586
1587                         if (out_check_resent(env, dt, dt_obj, req,
1588                                              out_reconstruct, reply, i))
1589                                 GOTO(next, rc = 0);
1590                 }
1591
1592                 /* start transaction for modification RPC only */
1593                 if (h->th_flags & MUTABOR && current_batchid == -1) {
1594                         current_batchid = update->ou_batchid;
1595                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1596                         if (rc != 0)
1597                                 GOTO(next, rc);
1598                 }
1599
1600                 /* Stop the current update transaction, if the update has
1601                  * different batchid, or read-only update */
1602                 if (((current_batchid != update->ou_batchid) ||
1603                      !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
1604                         rc = out_tx_end(env, ta, rc);
1605                         current_batchid = -1;
1606                         if (rc != 0)
1607                                 GOTO(next, rc);
1608
1609                         /* start a new transaction if needed */
1610                         if (h->th_flags & MUTABOR) {
1611                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1612                                 if (rc != 0)
1613                                         GOTO(next, rc);
1614
1615                                 current_batchid = update->ou_batchid;
1616                         }
1617                 }
1618
1619                 rc = h->th_act(tsi);
1620 next:
1621                 lu_object_put(env, &dt_obj->do_lu);
1622                 if (rc < 0)
1623                         GOTO(out, rc);
1624         }
1625 out:
1626         if (current_batchid != -1) {
1627                 rc1 = out_tx_end(env, ta, rc);
1628                 if (rc == 0)
1629                         rc = rc1;
1630         }
1631
1632         RETURN(rc);
1633 }
1634
1635 struct tgt_handler tgt_out_handlers[] = {
1636 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1637 };
1638 EXPORT_SYMBOL(tgt_out_handlers);
1639