Whamcloud - gitweb
b9c93220ee6438bcdbc607c6143307b861380176
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
40 {
41         struct tx_arg   **new_ta;
42         int             i;
43         int             rc = 0;
44
45         if (ta->ta_alloc_args >= new_alloc_ta)
46                 return 0;
47
48         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
49         if (new_ta == NULL)
50                 return -ENOMEM;
51
52         for (i = 0; i < new_alloc_ta; i++) {
53                 if (i < ta->ta_alloc_args) {
54                         /* copy the old args to new one */
55                         new_ta[i] = ta->ta_args[i];
56                 } else {
57                         OBD_ALLOC_PTR(new_ta[i]);
58                         if (new_ta[i] == NULL)
59                                 GOTO(out, rc = -ENOMEM);
60                 }
61         }
62
63         /* free the old args */
64         if (ta->ta_args != NULL)
65                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
66                                       ta->ta_alloc_args);
67
68         ta->ta_args = new_ta;
69         ta->ta_alloc_args = new_alloc_ta;
70 out:
71         if (rc != 0) {
72                 for (i = 0; i < new_alloc_ta; i++) {
73                         if (new_ta[i] != NULL)
74                                 OBD_FREE_PTR(new_ta[i]);
75                 }
76                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
77         }
78         return rc;
79 }
80
81 #define TX_ALLOC_STEP   8
82 static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
83                                   tx_exec_func_t func, tx_exec_func_t undo,
84                                   const char *file, int line)
85 {
86         int rc;
87         int i;
88
89         LASSERT(ta != NULL);
90         LASSERT(func != NULL);
91
92         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
93                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
94                 if (rc != 0)
95                         return ERR_PTR(rc);
96         }
97
98         i = ta->ta_argno;
99
100         ta->ta_argno++;
101
102         ta->ta_args[i]->exec_fn = func;
103         ta->ta_args[i]->undo_fn = undo;
104         ta->ta_args[i]->file    = file;
105         ta->ta_args[i]->line    = line;
106
107         return ta->ta_args[i];
108 }
109
110 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
111                             struct dt_object *obj,
112                             struct object_update_reply *reply,
113                             int index)
114 {
115         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
116                dt_obd_name(dt), reply, index, 0);
117
118         object_update_result_insert(reply, NULL, 0, index, 0);
119         return;
120 }
121
122 typedef void (*out_reconstruct_t)(const struct lu_env *env,
123                                   struct dt_device *dt,
124                                   struct dt_object *obj,
125                                   struct object_update_reply *reply,
126                                   int index);
127
128 static inline int out_check_resent(const struct lu_env *env,
129                                    struct dt_device *dt,
130                                    struct dt_object *obj,
131                                    struct ptlrpc_request *req,
132                                    out_reconstruct_t reconstruct,
133                                    struct object_update_reply *reply,
134                                    int index)
135 {
136         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
137                 return 0;
138
139         if (req_xid_is_last(req)) {
140                 reconstruct(env, dt, obj, reply, index);
141                 return 1;
142         }
143         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
144                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
145         return 0;
146 }
147
148 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
149                            struct thandle *th)
150 {
151         int rc;
152
153         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
154                PFID(lu_object_fid(&dt_obj->do_lu)));
155
156         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
157         rc = dt_destroy(env, dt_obj, th);
158         dt_write_unlock(env, dt_obj);
159
160         return rc;
161 }
162
163 /**
164  * All of the xxx_undo will be used once execution failed,
165  * But because all of the required resource has been reserved in
166  * declare phase, i.e. if declare succeed, it should make sure
167  * the following executing phase succeed in anyway, so these undo
168  * should be useless for most of the time in Phase I
169  */
170 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
171                        struct tx_arg *arg)
172 {
173         int rc;
174
175         rc = out_obj_destroy(env, arg->object, th);
176         if (rc != 0)
177                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
178                        dt_obd_name(th->th_dev), rc);
179         return rc;
180 }
181
182 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
183                        struct tx_arg *arg)
184 {
185         struct dt_object        *dt_obj = arg->object;
186         int                      rc;
187
188         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
189                dt_obd_name(th->th_dev),
190                PFID(lu_object_fid(&arg->object->do_lu)),
191                arg->u.create.dof.dof_type,
192                arg->u.create.attr.la_mode & S_IFMT);
193
194         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
195         rc = dt_create(env, dt_obj, &arg->u.create.attr,
196                        &arg->u.create.hint, &arg->u.create.dof, th);
197
198         dt_write_unlock(env, dt_obj);
199
200         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
201                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
202
203         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
204
205         return rc;
206 }
207
208 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
209                            struct lu_attr *attr, struct lu_fid *parent_fid,
210                            struct dt_object_format *dof,
211                            struct thandle_exec_args *ta,
212                            struct object_update_reply *reply,
213                            int index, const char *file, int line)
214 {
215         struct tx_arg *arg;
216         int rc;
217
218         LASSERT(ta->ta_handle != NULL);
219         rc = dt_declare_create(env, obj, attr, NULL, dof,
220                                        ta->ta_handle);
221         if (rc != 0)
222                 return rc;
223
224         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
225                           line);
226         if (IS_ERR(arg))
227                 return PTR_ERR(arg);
228
229         /* release the object in out_trans_stop */
230         lu_object_get(&obj->do_lu);
231         arg->object = obj;
232         arg->u.create.attr = *attr;
233         if (parent_fid != NULL)
234                 arg->u.create.fid = *parent_fid;
235         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
236         arg->u.create.dof  = *dof;
237         arg->reply = reply;
238         arg->index = index;
239
240         return 0;
241 }
242
243 static int out_create(struct tgt_session_info *tsi)
244 {
245         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
246         struct object_update    *update = tti->tti_u.update.tti_update;
247         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
248         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
249         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
250         struct lu_attr          *attr = &tti->tti_attr;
251         struct lu_fid           *fid = NULL;
252         struct obdo             *wobdo;
253         int                     size;
254         int                     rc;
255
256         ENTRY;
257
258         wobdo = object_update_param_get(update, 0, &size);
259         if (wobdo == NULL || size != sizeof(*wobdo)) {
260                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
261                        tgt_name(tsi->tsi_tgt), -EPROTO);
262                 RETURN(err_serious(-EPROTO));
263         }
264
265         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
266                 lustre_swab_obdo(wobdo);
267         lustre_get_wire_obdo(NULL, lobdo, wobdo);
268         la_from_obdo(attr, lobdo, lobdo->o_valid);
269
270         dof->dof_type = dt_mode_to_dft(attr->la_mode);
271         if (update->ou_params_count > 1) {
272                 int size;
273
274                 fid = object_update_param_get(update, 1, &size);
275                 if (fid == NULL || size != sizeof(*fid)) {
276                         CERROR("%s: invalid fid: rc = %d\n",
277                                tgt_name(tsi->tsi_tgt), -EPROTO);
278                         RETURN(err_serious(-EPROTO));
279                 }
280                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
281                         lustre_swab_lu_fid(fid);
282                 if (!fid_is_sane(fid)) {
283                         CERROR("%s: invalid fid "DFID": rc = %d\n",
284                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
285                         RETURN(err_serious(-EPROTO));
286                 }
287         }
288
289         if (lu_object_exists(&obj->do_lu))
290                 RETURN(-EEXIST);
291
292         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
293                            &tti->tti_tea,
294                            tti->tti_u.update.tti_update_reply,
295                            tti->tti_u.update.tti_update_reply_index);
296
297         RETURN(rc);
298 }
299
300 static int out_tx_attr_set_undo(const struct lu_env *env,
301                                 struct thandle *th, struct tx_arg *arg)
302 {
303         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
304                dt_obd_name(th->th_dev),
305                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
306
307         return -ENOTSUPP;
308 }
309
310 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
311                                 struct tx_arg *arg)
312 {
313         struct dt_object        *dt_obj = arg->object;
314         int                     rc;
315
316         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
317                PFID(lu_object_fid(&dt_obj->do_lu)));
318
319         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
320         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
321         dt_write_unlock(env, dt_obj);
322
323         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
324                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
325
326         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
327
328         return rc;
329 }
330
331 static int __out_tx_attr_set(const struct lu_env *env,
332                              struct dt_object *dt_obj,
333                              const struct lu_attr *attr,
334                              struct thandle_exec_args *th,
335                              struct object_update_reply *reply,
336                              int index, const char *file, int line)
337 {
338         struct tx_arg   *arg;
339         int             rc;
340
341         LASSERT(th->ta_handle != NULL);
342         rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
343         if (rc != 0)
344                 return rc;
345
346         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
347                           file, line);
348         if (IS_ERR(arg))
349                 return PTR_ERR(arg);
350
351         lu_object_get(&dt_obj->do_lu);
352         arg->object = dt_obj;
353         arg->u.attr_set.attr = *attr;
354         arg->reply = reply;
355         arg->index = index;
356         return 0;
357 }
358
359 static int out_attr_set(struct tgt_session_info *tsi)
360 {
361         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
362         struct object_update    *update = tti->tti_u.update.tti_update;
363         struct lu_attr          *attr = &tti->tti_attr;
364         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
365         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
366         struct obdo             *wobdo;
367         int                      size;
368         int                      rc;
369
370         ENTRY;
371
372         wobdo = object_update_param_get(update, 0, &size);
373         if (wobdo == NULL || size != sizeof(*wobdo)) {
374                 CERROR("%s: empty obdo in the update: rc = %d\n",
375                        tgt_name(tsi->tsi_tgt), -EPROTO);
376                 RETURN(err_serious(-EPROTO));
377         }
378
379         attr->la_valid = 0;
380         attr->la_valid = 0;
381
382         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
383                 lustre_swab_obdo(wobdo);
384         lustre_get_wire_obdo(NULL, lobdo, wobdo);
385         la_from_obdo(attr, lobdo, lobdo->o_valid);
386
387         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
388                              tti->tti_u.update.tti_update_reply,
389                              tti->tti_u.update.tti_update_reply_index);
390
391         RETURN(rc);
392 }
393
394 static int out_attr_get(struct tgt_session_info *tsi)
395 {
396         const struct lu_env     *env = tsi->tsi_env;
397         struct tgt_thread_info  *tti = tgt_th_info(env);
398         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
399         struct lu_attr          *la = &tti->tti_attr;
400         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
401         int                     idx = tti->tti_u.update.tti_update_reply_index;
402         int                     rc;
403
404         ENTRY;
405
406         if (!lu_object_exists(&obj->do_lu)) {
407                 /* Usually, this will be called when the master MDT try
408                  * to init a remote object(see osp_object_init), so if
409                  * the object does not exist on slave, we need set BANSHEE flag,
410                  * so the object can be removed from the cache immediately */
411                 set_bit(LU_OBJECT_HEARD_BANSHEE,
412                         &obj->do_lu.lo_header->loh_flags);
413                 RETURN(-ENOENT);
414         }
415
416         dt_read_lock(env, obj, MOR_TGT_CHILD);
417         rc = dt_attr_get(env, obj, la, NULL);
418         if (rc)
419                 GOTO(out_unlock, rc);
420
421         obdo->o_valid = 0;
422         obdo_from_la(obdo, la, la->la_valid);
423         lustre_set_wire_obdo(NULL, obdo, obdo);
424
425 out_unlock:
426         dt_read_unlock(env, obj);
427
428         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
429                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
430                0, rc);
431
432         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
433                                     sizeof(*obdo), idx, rc);
434
435         RETURN(rc);
436 }
437
438 static int out_xattr_get(struct tgt_session_info *tsi)
439 {
440         const struct lu_env        *env = tsi->tsi_env;
441         struct tgt_thread_info     *tti = tgt_th_info(env);
442         struct object_update       *update = tti->tti_u.update.tti_update;
443         struct lu_buf              *lbuf = &tti->tti_buf;
444         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
445         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
446         char                       *name;
447         struct object_update_result *update_result;
448         int                     idx = tti->tti_u.update.tti_update_reply_index;
449         int                        rc;
450
451         ENTRY;
452
453         if (!lu_object_exists(&obj->do_lu)) {
454                 set_bit(LU_OBJECT_HEARD_BANSHEE,
455                         &obj->do_lu.lo_header->loh_flags);
456                 RETURN(-ENOENT);
457         }
458
459         name = object_update_param_get(update, 0, NULL);
460         if (name == NULL) {
461                 CERROR("%s: empty name for xattr get: rc = %d\n",
462                        tgt_name(tsi->tsi_tgt), -EPROTO);
463                 RETURN(err_serious(-EPROTO));
464         }
465
466         update_result = object_update_result_get(reply, 0, NULL);
467         if (update_result == NULL) {
468                 CERROR("%s: empty name for xattr get: rc = %d\n",
469                        tgt_name(tsi->tsi_tgt), -EPROTO);
470                 RETURN(err_serious(-EPROTO));
471         }
472
473         lbuf->lb_buf = update_result->our_data;
474         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
475                        cfs_size_round((unsigned long)update_result->our_data -
476                                       (unsigned long)update_result);
477         dt_read_lock(env, obj, MOR_TGT_CHILD);
478         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
479         dt_read_unlock(env, obj);
480         if (rc < 0) {
481                 lbuf->lb_len = 0;
482                 GOTO(out, rc);
483         }
484         if (rc == 0) {
485                 lbuf->lb_len = 0;
486                 GOTO(out, rc = -ENOENT);
487         }
488         lbuf->lb_len = rc;
489         rc = 0;
490         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
491                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
492                name, (int)lbuf->lb_len);
493
494         GOTO(out, rc);
495
496 out:
497         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
498         RETURN(rc);
499 }
500
501 static int out_index_lookup(struct tgt_session_info *tsi)
502 {
503         const struct lu_env     *env = tsi->tsi_env;
504         struct tgt_thread_info  *tti = tgt_th_info(env);
505         struct object_update    *update = tti->tti_u.update.tti_update;
506         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
507         char                    *name;
508         int                      rc;
509
510         ENTRY;
511
512         if (!lu_object_exists(&obj->do_lu))
513                 RETURN(-ENOENT);
514
515         name = object_update_param_get(update, 0, NULL);
516         if (name == NULL) {
517                 CERROR("%s: empty name for lookup: rc = %d\n",
518                        tgt_name(tsi->tsi_tgt), -EPROTO);
519                 RETURN(err_serious(-EPROTO));
520         }
521
522         dt_read_lock(env, obj, MOR_TGT_CHILD);
523         if (!dt_try_as_dir(env, obj))
524                 GOTO(out_unlock, rc = -ENOTDIR);
525
526         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
527                 (struct dt_key *)name, NULL);
528
529         if (rc < 0)
530                 GOTO(out_unlock, rc);
531
532         if (rc == 0)
533                 rc += 1;
534
535 out_unlock:
536         dt_read_unlock(env, obj);
537
538         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
539                PFID(lu_object_fid(&obj->do_lu)), name,
540                PFID(&tti->tti_fid1), rc);
541
542         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
543                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
544                0, rc);
545
546         object_update_result_insert(tti->tti_u.update.tti_update_reply,
547                             &tti->tti_fid1, sizeof(tti->tti_fid1),
548                             tti->tti_u.update.tti_update_reply_index, rc);
549         RETURN(rc);
550 }
551
552 static int out_tx_xattr_set_exec(const struct lu_env *env,
553                                  struct thandle *th,
554                                  struct tx_arg *arg)
555 {
556         struct dt_object *dt_obj = arg->object;
557         int rc;
558
559         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
560                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
561                arg->u.xattr_set.name, arg->u.xattr_set.flags);
562
563         if (!lu_object_exists(&dt_obj->do_lu))
564                 GOTO(out, rc = -ENOENT);
565
566         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
567         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
568                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
569                           th, NULL);
570         dt_write_unlock(env, dt_obj);
571         /**
572          * Ignore errors if this is LINK EA
573          **/
574         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
575                 rc = 0;
576 out:
577         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
578                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
579
580         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
581
582         return rc;
583 }
584
585 static int __out_tx_xattr_set(const struct lu_env *env,
586                               struct dt_object *dt_obj,
587                               const struct lu_buf *buf,
588                               const char *name, int flags,
589                               struct thandle_exec_args *ta,
590                               struct object_update_reply *reply,
591                               int index, const char *file, int line)
592 {
593         struct tx_arg   *arg;
594         int             rc;
595
596         LASSERT(ta->ta_handle != NULL);
597         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle);
598         if (rc != 0)
599                 return rc;
600
601         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
602         if (IS_ERR(arg))
603                 return PTR_ERR(arg);
604
605         lu_object_get(&dt_obj->do_lu);
606         arg->object = dt_obj;
607         arg->u.xattr_set.name = name;
608         arg->u.xattr_set.flags = flags;
609         arg->u.xattr_set.buf = *buf;
610         arg->reply = reply;
611         arg->index = index;
612         arg->u.xattr_set.csum = 0;
613         return 0;
614 }
615
616 static int out_xattr_set(struct tgt_session_info *tsi)
617 {
618         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
619         struct object_update    *update = tti->tti_u.update.tti_update;
620         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
621         struct lu_buf           *lbuf = &tti->tti_buf;
622         char                    *name;
623         char                    *buf;
624         __u32                   *tmp;
625         int                      buf_len = 0;
626         int                      flag;
627         int                      size = 0;
628         int                      rc;
629         ENTRY;
630
631         name = object_update_param_get(update, 0, NULL);
632         if (name == NULL) {
633                 CERROR("%s: empty name for xattr set: rc = %d\n",
634                        tgt_name(tsi->tsi_tgt), -EPROTO);
635                 RETURN(err_serious(-EPROTO));
636         }
637
638         buf = object_update_param_get(update, 1, &buf_len);
639         if (buf == NULL || buf_len == 0) {
640                 CERROR("%s: empty buf for xattr set: rc = %d\n",
641                        tgt_name(tsi->tsi_tgt), -EPROTO);
642                 RETURN(err_serious(-EPROTO));
643         }
644
645         lbuf->lb_buf = buf;
646         lbuf->lb_len = buf_len;
647
648         tmp = object_update_param_get(update, 2, &size);
649         if (tmp == NULL || size != sizeof(*tmp)) {
650                 CERROR("%s: emptry or wrong size %d flag: rc = %d\n",
651                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
652                 RETURN(err_serious(-EPROTO));
653         }
654
655         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
656                 __swab32s(tmp);
657         flag = *tmp;
658
659         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
660                               &tti->tti_tea,
661                               tti->tti_u.update.tti_update_reply,
662                               tti->tti_u.update.tti_update_reply_index);
663         RETURN(rc);
664 }
665
666 static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th,
667                                  struct tx_arg *arg)
668 {
669         struct dt_object *dt_obj = arg->object;
670         int rc;
671
672         CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n",
673                dt_obd_name(th->th_dev), arg->u.xattr_set.name,
674                PFID(lu_object_fid(&dt_obj->do_lu)));
675
676         if (!lu_object_exists(&dt_obj->do_lu))
677                 GOTO(out, rc = -ENOENT);
678
679         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
680         rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name,
681                           th, NULL);
682         dt_write_unlock(env, dt_obj);
683 out:
684         CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n",
685                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
686
687         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
688
689         return rc;
690 }
691
692 static int __out_tx_xattr_del(const struct lu_env *env,
693                               struct dt_object *dt_obj, const char *name,
694                               struct thandle_exec_args *ta,
695                               struct object_update_reply *reply,
696                               int index, const char *file, int line)
697 {
698         struct tx_arg   *arg;
699         int             rc;
700
701         rc = dt_declare_xattr_del(env, dt_obj, name, ta->ta_handle);
702         if (rc != 0)
703                 return rc;
704
705         arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line);
706         if (IS_ERR(arg))
707                 return PTR_ERR(arg);
708
709         lu_object_get(&dt_obj->do_lu);
710         arg->object = dt_obj;
711         arg->u.xattr_set.name = name;
712         arg->reply = reply;
713         arg->index = index;
714         return 0;
715 }
716
717 static int out_xattr_del(struct tgt_session_info *tsi)
718 {
719         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
720         struct object_update    *update = tti->tti_u.update.tti_update;
721         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
722         char                    *name;
723         int                      rc;
724         ENTRY;
725
726         name = object_update_param_get(update, 0, NULL);
727         if (name == NULL) {
728                 CERROR("%s: empty name for xattr set: rc = %d\n",
729                        tgt_name(tsi->tsi_tgt), -EPROTO);
730                 RETURN(err_serious(-EPROTO));
731         }
732
733         rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea,
734                               tti->tti_u.update.tti_update_reply,
735                               tti->tti_u.update.tti_update_reply_index);
736         RETURN(rc);
737 }
738
739 static int out_obj_ref_add(const struct lu_env *env,
740                            struct dt_object *dt_obj,
741                            struct thandle *th)
742 {
743         int rc;
744
745         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
746         rc = dt_ref_add(env, dt_obj, th);
747         dt_write_unlock(env, dt_obj);
748
749         return rc;
750 }
751
752 static int out_obj_ref_del(const struct lu_env *env,
753                            struct dt_object *dt_obj,
754                            struct thandle *th)
755 {
756         int rc;
757
758         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
759         rc = dt_ref_del(env, dt_obj, th);
760         dt_write_unlock(env, dt_obj);
761
762         return rc;
763 }
764
765 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
766                                struct tx_arg *arg)
767 {
768         struct dt_object *dt_obj = arg->object;
769         int rc;
770
771         rc = out_obj_ref_add(env, dt_obj, th);
772
773         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
774                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
775
776         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
777         return rc;
778 }
779
780 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
781                                struct tx_arg *arg)
782 {
783         return out_obj_ref_del(env, arg->object, th);
784 }
785
786 static int __out_tx_ref_add(const struct lu_env *env,
787                             struct dt_object *dt_obj,
788                             struct thandle_exec_args *ta,
789                             struct object_update_reply *reply,
790                             int index, const char *file, int line)
791 {
792         struct tx_arg   *arg;
793         int             rc;
794
795         LASSERT(ta->ta_handle != NULL);
796         rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
797         if (rc != 0)
798                 return rc;
799
800         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
801                           line);
802         if (IS_ERR(arg))
803                 return PTR_ERR(arg);
804
805         lu_object_get(&dt_obj->do_lu);
806         arg->object = dt_obj;
807         arg->reply = reply;
808         arg->index = index;
809         return 0;
810 }
811
812 /**
813  * increase ref of the object
814  **/
815 static int out_ref_add(struct tgt_session_info *tsi)
816 {
817         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
818         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
819         int                      rc;
820
821         ENTRY;
822
823         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
824                             tti->tti_u.update.tti_update_reply,
825                             tti->tti_u.update.tti_update_reply_index);
826         RETURN(rc);
827 }
828
829 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
830                                struct tx_arg *arg)
831 {
832         struct dt_object        *dt_obj = arg->object;
833         int                      rc;
834
835         rc = out_obj_ref_del(env, dt_obj, th);
836
837         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
838                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
839
840         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
841
842         return rc;
843 }
844
845 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
846                                struct tx_arg *arg)
847 {
848         return out_obj_ref_add(env, arg->object, th);
849 }
850
851 static int __out_tx_ref_del(const struct lu_env *env,
852                             struct dt_object *dt_obj,
853                             struct thandle_exec_args *ta,
854                             struct object_update_reply *reply,
855                             int index, const char *file, int line)
856 {
857         struct tx_arg   *arg;
858         int             rc;
859
860         LASSERT(ta->ta_handle != NULL);
861         rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
862         if (rc != 0)
863                 return rc;
864
865         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
866                           line);
867         if (IS_ERR(arg))
868                 return PTR_ERR(arg);
869
870         lu_object_get(&dt_obj->do_lu);
871         arg->object = dt_obj;
872         arg->reply = reply;
873         arg->index = index;
874         return 0;
875 }
876
877 static int out_ref_del(struct tgt_session_info *tsi)
878 {
879         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
880         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
881         int                      rc;
882
883         ENTRY;
884
885         if (!lu_object_exists(&obj->do_lu))
886                 RETURN(-ENOENT);
887
888         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
889                             tti->tti_u.update.tti_update_reply,
890                             tti->tti_u.update.tti_update_reply_index);
891         RETURN(rc);
892 }
893
894 static int out_obj_index_insert(const struct lu_env *env,
895                                 struct dt_object *dt_obj,
896                                 const struct dt_rec *rec,
897                                 const struct dt_key *key,
898                                 struct thandle *th)
899 {
900         int rc;
901
902         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
903                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
904                (char *)key, PFID((struct lu_fid *)rec));
905
906         if (dt_try_as_dir(env, dt_obj) == 0)
907                 return -ENOTDIR;
908
909         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
910         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
911         dt_write_unlock(env, dt_obj);
912
913         return rc;
914 }
915
916 static int out_obj_index_delete(const struct lu_env *env,
917                                 struct dt_object *dt_obj,
918                                 const struct dt_key *key,
919                                 struct thandle *th)
920 {
921         int rc;
922
923         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
924                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
925                (char *)key);
926
927         if (dt_try_as_dir(env, dt_obj) == 0)
928                 return -ENOTDIR;
929
930         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
931         rc = dt_delete(env, dt_obj, key, th, NULL);
932         dt_write_unlock(env, dt_obj);
933
934         return rc;
935 }
936
937 static int out_tx_index_insert_exec(const struct lu_env *env,
938                                     struct thandle *th, struct tx_arg *arg)
939 {
940         struct dt_object *dt_obj = arg->object;
941         int rc;
942
943         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
944                                   arg->u.insert.key, th);
945
946         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
947                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
948
949         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
950
951         return rc;
952 }
953
954 static int out_tx_index_insert_undo(const struct lu_env *env,
955                                     struct thandle *th, struct tx_arg *arg)
956 {
957         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
958 }
959
960 static int __out_tx_index_insert(const struct lu_env *env,
961                                  struct dt_object *dt_obj,
962                                  char *name, struct lu_fid *fid,
963                                  struct thandle_exec_args *ta,
964                                  struct object_update_reply *reply,
965                                  int index, const char *file, int line)
966 {
967         struct tx_arg   *arg;
968         int             rc;
969
970         LASSERT(ta->ta_handle != NULL);
971         if (dt_try_as_dir(env, dt_obj) == 0) {
972                 rc = -ENOTDIR;
973                 return rc;
974         }
975
976         rc = dt_declare_insert(env, dt_obj, (struct dt_rec *)fid,
977                                (struct dt_key *)name, ta->ta_handle);
978         if (rc != 0)
979                 return rc;
980
981         arg = tx_add_exec(ta, out_tx_index_insert_exec,
982                           out_tx_index_insert_undo, file, line);
983         if (IS_ERR(arg))
984                 return PTR_ERR(arg);
985
986         lu_object_get(&dt_obj->do_lu);
987         arg->object = dt_obj;
988         arg->reply = reply;
989         arg->index = index;
990         arg->u.insert.rec = (struct dt_rec *)fid;
991         arg->u.insert.key = (struct dt_key *)name;
992
993         return 0;
994 }
995
996 static int out_index_insert(struct tgt_session_info *tsi)
997 {
998         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
999         struct object_update    *update = tti->tti_u.update.tti_update;
1000         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
1001         struct lu_fid     *fid;
1002         char              *name;
1003         int                rc = 0;
1004         int                size;
1005
1006         ENTRY;
1007
1008         name = object_update_param_get(update, 0, NULL);
1009         if (name == NULL) {
1010                 CERROR("%s: empty name for index insert: rc = %d\n",
1011                        tgt_name(tsi->tsi_tgt), -EPROTO);
1012                 RETURN(err_serious(-EPROTO));
1013         }
1014
1015         fid = object_update_param_get(update, 1, &size);
1016         if (fid == NULL || size != sizeof(*fid)) {
1017                 CERROR("%s: invalid fid: rc = %d\n",
1018                        tgt_name(tsi->tsi_tgt), -EPROTO);
1019                        RETURN(err_serious(-EPROTO));
1020         }
1021
1022         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1023                 lustre_swab_lu_fid(fid);
1024
1025         if (!fid_is_sane(fid)) {
1026                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1027                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1028                 RETURN(err_serious(-EPROTO));
1029         }
1030
1031         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
1032                                  &tti->tti_tea,
1033                                  tti->tti_u.update.tti_update_reply,
1034                                  tti->tti_u.update.tti_update_reply_index);
1035         RETURN(rc);
1036 }
1037
1038 static int out_tx_index_delete_exec(const struct lu_env *env,
1039                                     struct thandle *th,
1040                                     struct tx_arg *arg)
1041 {
1042         int rc;
1043
1044         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1045
1046         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
1047                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1048
1049         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1050
1051         return rc;
1052 }
1053
1054 static int out_tx_index_delete_undo(const struct lu_env *env,
1055                                     struct thandle *th,
1056                                     struct tx_arg *arg)
1057 {
1058         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1059                dt_obd_name(th->th_dev), -ENOTSUPP);
1060         return -ENOTSUPP;
1061 }
1062
1063 static int __out_tx_index_delete(const struct lu_env *env,
1064                                  struct dt_object *dt_obj, char *name,
1065                                  struct thandle_exec_args *ta,
1066                                  struct object_update_reply *reply,
1067                                  int index, const char *file, int line)
1068 {
1069         struct tx_arg   *arg;
1070         int             rc;
1071
1072         if (dt_try_as_dir(env, dt_obj) == 0) {
1073                 rc = -ENOTDIR;
1074                 return rc;
1075         }
1076
1077         LASSERT(ta->ta_handle != NULL);
1078         rc = dt_declare_delete(env, dt_obj, (struct dt_key *)name,
1079                                ta->ta_handle);
1080         if (rc != 0)
1081                 return rc;
1082
1083         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1084                           out_tx_index_delete_undo, file, line);
1085         if (IS_ERR(arg))
1086                 return PTR_ERR(arg);
1087
1088         lu_object_get(&dt_obj->do_lu);
1089         arg->object = dt_obj;
1090         arg->reply = reply;
1091         arg->index = index;
1092         arg->u.insert.key = (struct dt_key *)name;
1093         return 0;
1094 }
1095
1096 static int out_index_delete(struct tgt_session_info *tsi)
1097 {
1098         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1099         struct object_update    *update = tti->tti_u.update.tti_update;
1100         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1101         char                    *name;
1102         int                      rc = 0;
1103
1104         if (!lu_object_exists(&obj->do_lu))
1105                 RETURN(-ENOENT);
1106
1107         name = object_update_param_get(update, 0, NULL);
1108         if (name == NULL) {
1109                 CERROR("%s: empty name for index delete: rc = %d\n",
1110                        tgt_name(tsi->tsi_tgt), -EPROTO);
1111                 RETURN(err_serious(-EPROTO));
1112         }
1113
1114         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1115                                  tti->tti_u.update.tti_update_reply,
1116                                  tti->tti_u.update.tti_update_reply_index);
1117         RETURN(rc);
1118 }
1119
1120 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1121                                struct tx_arg *arg)
1122 {
1123         struct dt_object *dt_obj = arg->object;
1124         int rc;
1125
1126         rc = out_obj_destroy(env, dt_obj, th);
1127
1128         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1129                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1130
1131         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1132
1133         RETURN(rc);
1134 }
1135
1136 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1137                                struct tx_arg *arg)
1138 {
1139         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1140                dt_obd_name(th->th_dev), -ENOTSUPP);
1141         return -ENOTSUPP;
1142 }
1143
1144 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1145                              struct thandle_exec_args *ta,
1146                              struct object_update_reply *reply,
1147                              int index, const char *file, int line)
1148 {
1149         struct tx_arg   *arg;
1150         int             rc;
1151
1152         LASSERT(ta->ta_handle != NULL);
1153         rc = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1154         if (rc != 0)
1155                 return rc;
1156
1157         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1158                           file, line);
1159         if (IS_ERR(arg))
1160                 return PTR_ERR(arg);
1161
1162         lu_object_get(&dt_obj->do_lu);
1163         arg->object = dt_obj;
1164         arg->reply = reply;
1165         arg->index = index;
1166         return 0;
1167 }
1168
1169 static int out_destroy(struct tgt_session_info *tsi)
1170 {
1171         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1172         struct object_update    *update = tti->tti_u.update.tti_update;
1173         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1174         struct lu_fid           *fid;
1175         int                      rc;
1176         ENTRY;
1177
1178         fid = &update->ou_fid;
1179         if (!fid_is_sane(fid)) {
1180                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1181                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1182                 RETURN(err_serious(-EPROTO));
1183         }
1184
1185         if (!lu_object_exists(&obj->do_lu))
1186                 RETURN(-ENOENT);
1187
1188         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1189                             tti->tti_u.update.tti_update_reply,
1190                             tti->tti_u.update.tti_update_reply_index);
1191
1192         RETURN(rc);
1193 }
1194
1195 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1196                              struct tx_arg *arg)
1197 {
1198         struct dt_object *dt_obj = arg->object;
1199         int rc;
1200
1201         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1202         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1203                              &arg->u.write.pos, th);
1204         dt_write_unlock(env, dt_obj);
1205
1206         if (rc == 0)
1207                 rc = arg->u.write.buf.lb_len;
1208
1209         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1210
1211         return rc > 0 ? 0 : rc;
1212 }
1213
1214 static int __out_tx_write(const struct lu_env *env,
1215                           struct dt_object *dt_obj,
1216                           const struct lu_buf *buf,
1217                           loff_t pos, struct thandle_exec_args *ta,
1218                           struct object_update_reply *reply,
1219                           int index, const char *file, int line)
1220 {
1221         struct tx_arg   *arg;
1222         int             rc;
1223
1224         LASSERT(ta->ta_handle != NULL);
1225         rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle);
1226         if (rc != 0)
1227                 return rc;
1228
1229         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1230         if (IS_ERR(arg))
1231                 return PTR_ERR(arg);
1232
1233         lu_object_get(&dt_obj->do_lu);
1234         arg->object = dt_obj;
1235         arg->u.write.buf = *buf;
1236         arg->u.write.pos = pos;
1237         arg->reply = reply;
1238         arg->index = index;
1239         return 0;
1240 }
1241
1242 static int out_write(struct tgt_session_info *tsi)
1243 {
1244         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1245         struct object_update    *update = tti->tti_u.update.tti_update;
1246         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1247         struct lu_buf           *lbuf = &tti->tti_buf;
1248         char                    *buf;
1249         __u64                   *tmp;
1250         int                     size = 0;
1251         int                     buf_len = 0;
1252         loff_t                  pos;
1253         int                      rc;
1254         ENTRY;
1255
1256         buf = object_update_param_get(update, 0, &buf_len);
1257         if (buf == NULL || buf_len == 0) {
1258                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1259                        tgt_name(tsi->tsi_tgt), -EPROTO);
1260                 RETURN(err_serious(-EPROTO));
1261         }
1262         lbuf->lb_buf = buf;
1263         lbuf->lb_len = buf_len;
1264
1265         tmp = object_update_param_get(update, 1, &size);
1266         if (tmp == NULL || size != sizeof(*tmp)) {
1267                 CERROR("%s: empty or wrong size %d pos: rc = %d\n",
1268                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
1269                 RETURN(err_serious(-EPROTO));
1270         }
1271
1272         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1273                 __swab64s(tmp);
1274         pos = *tmp;
1275
1276         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1277                           &tti->tti_tea,
1278                           tti->tti_u.update.tti_update_reply,
1279                           tti->tti_u.update.tti_update_reply_index);
1280         RETURN(rc);
1281 }
1282
1283 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1284 [opc - OUT_CREATE] = {                                  \
1285         .th_name    = name,                             \
1286         .th_fail_id = 0,                                \
1287         .th_opc     = opc,                              \
1288         .th_flags   = flags,                            \
1289         .th_act     = fn,                               \
1290         .th_fmt     = NULL,                             \
1291         .th_version = 0,                                \
1292 }
1293
1294 static struct tgt_handler out_update_ops[] = {
1295         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1296                      out_create),
1297         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1298                      out_destroy),
1299         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1300                      out_ref_add),
1301         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1302                      out_ref_del),
1303         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1304                      out_attr_set),
1305         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1306                      out_attr_get),
1307         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1308                      out_xattr_set),
1309         DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO,
1310                      out_xattr_del),
1311         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1312                      out_xattr_get),
1313         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1314                      out_index_lookup),
1315         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1316                      MUTABOR | HABEO_REFERO, out_index_insert),
1317         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1318                      MUTABOR | HABEO_REFERO, out_index_delete),
1319         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1320 };
1321
1322 struct tgt_handler *out_handler_find(__u32 opc)
1323 {
1324         struct tgt_handler *h;
1325
1326         h = NULL;
1327         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1328                 h = &out_update_ops[opc - OUT_CREATE];
1329                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1330                          h->th_opc, opc);
1331         } else {
1332                 h = NULL; /* unsupported opc */
1333         }
1334         return h;
1335 }
1336
1337 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1338                         struct thandle_exec_args *ta, struct obd_export *exp)
1339 {
1340         ta->ta_argno = 0;
1341         ta->ta_handle = dt_trans_create(env, dt);
1342         if (IS_ERR(ta->ta_handle)) {
1343                 int rc;
1344
1345                 rc = PTR_ERR(ta->ta_handle);
1346                 ta->ta_handle = NULL;
1347                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
1348                        rc);
1349                 return rc;
1350         }
1351         if (exp->exp_need_sync)
1352                 ta->ta_handle->th_sync = 1;
1353
1354         return 0;
1355 }
1356
1357 static int out_trans_start(const struct lu_env *env,
1358                            struct thandle_exec_args *ta)
1359 {
1360         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
1361 }
1362
1363 static int out_trans_stop(const struct lu_env *env,
1364                           struct thandle_exec_args *ta, int err)
1365 {
1366         int i;
1367         int rc;
1368
1369         ta->ta_handle->th_result = err;
1370         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
1371         for (i = 0; i < ta->ta_argno; i++) {
1372                 if (ta->ta_args[i]->object != NULL) {
1373                         struct dt_object *obj = ta->ta_args[i]->object;
1374
1375                         /* If the object is being created during this
1376                          * transaction, we need to remove them from the
1377                          * cache immediately, because a few layers are
1378                          * missing in OUT handler, i.e. the object might
1379                          * not be initialized in all layers */
1380                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
1381                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1382                                         &obj->do_lu.lo_header->loh_flags);
1383                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1384                         ta->ta_args[i]->object = NULL;
1385                 }
1386         }
1387         ta->ta_handle = NULL;
1388         ta->ta_argno = 0;
1389
1390         return rc;
1391 }
1392
1393 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
1394                int declare_ret)
1395 {
1396         struct tgt_session_info *tsi = tgt_ses_info(env);
1397         int                     i;
1398         int                     rc;
1399         int                     rc1;
1400         ENTRY;
1401
1402         if (ta->ta_handle == NULL)
1403                 RETURN(0);
1404
1405         if (declare_ret != 0 || ta->ta_argno == 0)
1406                 GOTO(stop, rc = declare_ret);
1407
1408         LASSERT(ta->ta_handle->th_dev != NULL);
1409         rc = out_trans_start(env, ta);
1410         if (unlikely(rc != 0))
1411                 GOTO(stop, rc);
1412
1413         for (i = 0; i < ta->ta_argno; i++) {
1414                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
1415                                              ta->ta_args[i]);
1416                 if (unlikely(rc != 0)) {
1417                         CDEBUG(D_INFO, "error during execution of #%u from"
1418                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1419                                ta->ta_args[i]->line, rc);
1420                         while (--i >= 0) {
1421                                 if (ta->ta_args[i]->undo_fn != NULL)
1422                                         ta->ta_args[i]->undo_fn(env,
1423                                                                ta->ta_handle,
1424                                                                ta->ta_args[i]);
1425                                 else
1426                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1427                                              dt_obd_name(ta->ta_handle->th_dev),
1428                                                ta->ta_args[i]->file,
1429                                                ta->ta_args[i]->line, -ENOTSUPP);
1430                         }
1431                         break;
1432                 }
1433                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1434                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
1435         }
1436
1437         /* Only fail for real update */
1438         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1439 stop:
1440         rc1 = out_trans_stop(env, ta, rc);
1441         if (rc == 0)
1442                 rc = rc1;
1443
1444         ta->ta_handle = NULL;
1445         ta->ta_argno = 0;
1446
1447         RETURN(rc);
1448 }
1449
1450 /**
1451  * Object updates between Targets. Because all the updates has been
1452  * dis-assemblied into object updates at sender side, so OUT will
1453  * call OSD API directly to execute these updates.
1454  *
1455  * In DNE phase I all of the updates in the request need to be executed
1456  * in one transaction, and the transaction has to be synchronously.
1457  *
1458  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1459  * format.
1460  */
1461 int out_handle(struct tgt_session_info *tsi)
1462 {
1463         const struct lu_env             *env = tsi->tsi_env;
1464         struct tgt_thread_info          *tti = tgt_th_info(env);
1465         struct thandle_exec_args        *ta = &tti->tti_tea;
1466         struct req_capsule              *pill = tsi->tsi_pill;
1467         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1468         struct object_update_request    *ureq;
1469         struct object_update            *update;
1470         struct object_update_reply      *reply;
1471         int                              bufsize;
1472         int                              count;
1473         int                              current_batchid = -1;
1474         int                              i;
1475         int                              rc = 0;
1476         int                              rc1 = 0;
1477
1478         ENTRY;
1479
1480         req_capsule_set(pill, &RQF_OUT_UPDATE);
1481         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1482         if (ureq == NULL) {
1483                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1484                        -EPROTO);
1485                 RETURN(err_serious(-EPROTO));
1486         }
1487
1488         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1489         if (bufsize != object_update_request_size(ureq)) {
1490                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1491                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1492                 RETURN(err_serious(-EPROTO));
1493         }
1494
1495         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1496                 CERROR("%s: invalid update buffer magic %x expect %x: "
1497                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1498                        UPDATE_REQUEST_MAGIC, -EPROTO);
1499                 RETURN(err_serious(-EPROTO));
1500         }
1501
1502         count = ureq->ourq_count;
1503         if (count <= 0) {
1504                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1505                        -EPROTO);
1506                 RETURN(err_serious(-EPROTO));
1507         }
1508
1509         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1510                              OUT_UPDATE_REPLY_SIZE);
1511         rc = req_capsule_server_pack(pill);
1512         if (rc != 0) {
1513                 CERROR("%s: Can't pack response: rc = %d\n",
1514                        tgt_name(tsi->tsi_tgt), rc);
1515                 RETURN(rc);
1516         }
1517
1518         /* Prepare the update reply buffer */
1519         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1520         if (reply == NULL)
1521                 RETURN(err_serious(-EPROTO));
1522         object_update_reply_init(reply, count);
1523         tti->tti_u.update.tti_update_reply = reply;
1524         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1525
1526         /* Walk through updates in the request to execute them synchronously */
1527         for (i = 0; i < count; i++) {
1528                 struct tgt_handler      *h;
1529                 struct dt_object        *dt_obj;
1530
1531                 update = object_update_request_get(ureq, i, NULL);
1532                 if (update == NULL)
1533                         GOTO(out, rc = -EPROTO);
1534
1535                 if (ptlrpc_req_need_swab(pill->rc_req))
1536                         lustre_swab_object_update(update);
1537
1538                 if (!fid_is_sane(&update->ou_fid)) {
1539                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1540                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1541                                -EPROTO);
1542                         GOTO(out, rc = err_serious(-EPROTO));
1543                 }
1544
1545                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1546                 if (IS_ERR(dt_obj))
1547                         GOTO(out, rc = PTR_ERR(dt_obj));
1548
1549                 if (dt->dd_record_fid_accessed) {
1550                         lfsck_pack_rfa(&tti->tti_lr,
1551                                        lu_object_fid(&dt_obj->do_lu));
1552                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1553                 }
1554
1555                 tti->tti_u.update.tti_dt_object = dt_obj;
1556                 tti->tti_u.update.tti_update = update;
1557                 tti->tti_u.update.tti_update_reply_index = i;
1558
1559                 h = out_handler_find(update->ou_type);
1560                 if (unlikely(h == NULL)) {
1561                         CERROR("%s: unsupported opc: 0x%x\n",
1562                                tgt_name(tsi->tsi_tgt), update->ou_type);
1563                         GOTO(next, rc = -ENOTSUPP);
1564                 }
1565
1566                 /* Check resend case only for modifying RPC */
1567                 if (h->th_flags & MUTABOR) {
1568                         struct ptlrpc_request *req = tgt_ses_req(tsi);
1569
1570                         if (out_check_resent(env, dt, dt_obj, req,
1571                                              out_reconstruct, reply, i))
1572                                 GOTO(next, rc = 0);
1573                 }
1574
1575                 /* start transaction for modification RPC only */
1576                 if (h->th_flags & MUTABOR && current_batchid == -1) {
1577                         current_batchid = update->ou_batchid;
1578                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1579                         if (rc != 0)
1580                                 GOTO(next, rc);
1581                 }
1582
1583                 /* Stop the current update transaction, if the update has
1584                  * different batchid, or read-only update */
1585                 if (((current_batchid != update->ou_batchid) ||
1586                      !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
1587                         rc = out_tx_end(env, ta, rc);
1588                         current_batchid = -1;
1589                         if (rc != 0)
1590                                 GOTO(next, rc);
1591
1592                         /* start a new transaction if needed */
1593                         if (h->th_flags & MUTABOR) {
1594                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1595                                 if (rc != 0)
1596                                         GOTO(next, rc);
1597
1598                                 current_batchid = update->ou_batchid;
1599                         }
1600                 }
1601
1602                 rc = h->th_act(tsi);
1603 next:
1604                 lu_object_put(env, &dt_obj->do_lu);
1605                 if (rc < 0)
1606                         GOTO(out, rc);
1607         }
1608 out:
1609         if (current_batchid != -1) {
1610                 rc1 = out_tx_end(env, ta, rc);
1611                 if (rc == 0)
1612                         rc = rc1;
1613         }
1614
1615         RETURN(rc);
1616 }
1617
1618 struct tgt_handler tgt_out_handlers[] = {
1619 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1620 };
1621 EXPORT_SYMBOL(tgt_out_handlers);
1622