Whamcloud - gitweb
efc8b4177c246643e16fc5d7d970eb963713e89e
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/mdt/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
40                            tx_exec_func_t undo, char *file, int line)
41 {
42         int i;
43
44         LASSERT(ta);
45         LASSERT(func);
46
47         LASSERTF(ta->ta_argno + 1 <= TX_MAX_OPS,
48                  "Too many updates(%d) in one trans\n", ta->ta_argno);
49         i = ta->ta_argno;
50
51         ta->ta_argno++;
52
53         ta->ta_args[i].exec_fn = func;
54         ta->ta_args[i].undo_fn = undo;
55         ta->ta_args[i].file    = file;
56         ta->ta_args[i].line    = line;
57
58         return &ta->ta_args[i];
59 }
60
61 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
62                             struct dt_object *obj,
63                             struct object_update_reply *reply,
64                             int index)
65 {
66         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
67                dt_obd_name(dt), reply, index, 0);
68
69         object_update_result_insert(reply, NULL, 0, index, 0);
70         return;
71 }
72
73 typedef void (*out_reconstruct_t)(const struct lu_env *env,
74                                   struct dt_device *dt,
75                                   struct dt_object *obj,
76                                   struct object_update_reply *reply,
77                                   int index);
78
79 static inline int out_check_resent(const struct lu_env *env,
80                                    struct dt_device *dt,
81                                    struct dt_object *obj,
82                                    struct ptlrpc_request *req,
83                                    out_reconstruct_t reconstruct,
84                                    struct object_update_reply *reply,
85                                    int index)
86 {
87         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
88                 return 0;
89
90         if (req_xid_is_last(req)) {
91                 reconstruct(env, dt, obj, reply, index);
92                 return 1;
93         }
94         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
95                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
96         return 0;
97 }
98
99 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
100                            struct thandle *th)
101 {
102         int rc;
103
104         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
105                PFID(lu_object_fid(&dt_obj->do_lu)));
106
107         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
108         rc = dt_destroy(env, dt_obj, th);
109         dt_write_unlock(env, dt_obj);
110
111         return rc;
112 }
113
114 /**
115  * All of the xxx_undo will be used once execution failed,
116  * But because all of the required resource has been reserved in
117  * declare phase, i.e. if declare succeed, it should make sure
118  * the following executing phase succeed in anyway, so these undo
119  * should be useless for most of the time in Phase I
120  */
121 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
122                        struct tx_arg *arg)
123 {
124         int rc;
125
126         rc = out_obj_destroy(env, arg->object, th);
127         if (rc != 0)
128                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
129                        dt_obd_name(th->th_dev), rc);
130         return rc;
131 }
132
133 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
134                        struct tx_arg *arg)
135 {
136         struct dt_object        *dt_obj = arg->object;
137         int                      rc;
138
139         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
140                dt_obd_name(th->th_dev),
141                PFID(lu_object_fid(&arg->object->do_lu)),
142                arg->u.create.dof.dof_type,
143                arg->u.create.attr.la_mode & S_IFMT);
144
145         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
146         rc = dt_create(env, dt_obj, &arg->u.create.attr,
147                        &arg->u.create.hint, &arg->u.create.dof, th);
148
149         dt_write_unlock(env, dt_obj);
150
151         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
152                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
153
154         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
155
156         return rc;
157 }
158
159 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
160                            struct lu_attr *attr, struct lu_fid *parent_fid,
161                            struct dt_object_format *dof,
162                            struct thandle_exec_args *ta,
163                            struct object_update_reply *reply,
164                            int index, char *file, int line)
165 {
166         struct tx_arg *arg;
167
168         LASSERT(ta->ta_handle != NULL);
169         ta->ta_err = dt_declare_create(env, obj, attr, NULL, dof,
170                                        ta->ta_handle);
171         if (ta->ta_err != 0)
172                 return ta->ta_err;
173
174         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
175                           line);
176         LASSERT(arg);
177
178         /* release the object in out_trans_stop */
179         lu_object_get(&obj->do_lu);
180         arg->object = obj;
181         arg->u.create.attr = *attr;
182         if (parent_fid != NULL)
183                 arg->u.create.fid = *parent_fid;
184         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
185         arg->u.create.dof  = *dof;
186         arg->reply = reply;
187         arg->index = index;
188
189         return 0;
190 }
191
192 static int out_create(struct tgt_session_info *tsi)
193 {
194         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
195         struct object_update    *update = tti->tti_u.update.tti_update;
196         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
197         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
198         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
199         struct lu_attr          *attr = &tti->tti_attr;
200         struct lu_fid           *fid = NULL;
201         struct obdo             *wobdo;
202         int                     size;
203         int                     rc;
204
205         ENTRY;
206
207         wobdo = object_update_param_get(update, 0, &size);
208         if (wobdo == NULL || size != sizeof(*wobdo)) {
209                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
210                        tgt_name(tsi->tsi_tgt), -EPROTO);
211                 RETURN(err_serious(-EPROTO));
212         }
213
214         obdo_le_to_cpu(wobdo, wobdo);
215         lustre_get_wire_obdo(NULL, lobdo, wobdo);
216         la_from_obdo(attr, lobdo, lobdo->o_valid);
217
218         dof->dof_type = dt_mode_to_dft(attr->la_mode);
219         if (update->ou_params_count > 1) {
220                 int size;
221
222                 fid = object_update_param_get(update, 1, &size);
223                 if (fid == NULL || size != sizeof(*fid)) {
224                         CERROR("%s: invalid fid: rc = %d\n",
225                                tgt_name(tsi->tsi_tgt), -EPROTO);
226                         RETURN(err_serious(-EPROTO));
227                 }
228                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
229                         lustre_swab_lu_fid(fid);
230                 if (!fid_is_sane(fid)) {
231                         CERROR("%s: invalid fid "DFID": rc = %d\n",
232                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
233                         RETURN(err_serious(-EPROTO));
234                 }
235         }
236
237         if (lu_object_exists(&obj->do_lu))
238                 RETURN(-EEXIST);
239
240         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
241                            &tti->tti_tea,
242                            tti->tti_u.update.tti_update_reply,
243                            tti->tti_u.update.tti_update_reply_index);
244
245         RETURN(rc);
246 }
247
248 static int out_tx_attr_set_undo(const struct lu_env *env,
249                                 struct thandle *th, struct tx_arg *arg)
250 {
251         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
252                dt_obd_name(th->th_dev),
253                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
254
255         return -ENOTSUPP;
256 }
257
258 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
259                                 struct tx_arg *arg)
260 {
261         struct dt_object        *dt_obj = arg->object;
262         int                     rc;
263
264         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
265                PFID(lu_object_fid(&dt_obj->do_lu)));
266
267         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
268         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
269         dt_write_unlock(env, dt_obj);
270
271         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
272                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
273
274         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
275
276         return rc;
277 }
278
279 static int __out_tx_attr_set(const struct lu_env *env,
280                              struct dt_object *dt_obj,
281                              const struct lu_attr *attr,
282                              struct thandle_exec_args *th,
283                              struct object_update_reply *reply,
284                              int index, char *file, int line)
285 {
286         struct tx_arg           *arg;
287
288         LASSERT(th->ta_handle != NULL);
289         th->ta_err = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
290         if (th->ta_err != 0)
291                 return th->ta_err;
292
293         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
294                           file, line);
295         LASSERT(arg);
296         lu_object_get(&dt_obj->do_lu);
297         arg->object = dt_obj;
298         arg->u.attr_set.attr = *attr;
299         arg->reply = reply;
300         arg->index = index;
301         return 0;
302 }
303
304 static int out_attr_set(struct tgt_session_info *tsi)
305 {
306         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
307         struct object_update    *update = tti->tti_u.update.tti_update;
308         struct lu_attr          *attr = &tti->tti_attr;
309         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
310         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
311         struct obdo             *wobdo;
312         int                      size;
313         int                      rc;
314
315         ENTRY;
316
317         wobdo = object_update_param_get(update, 0, &size);
318         if (wobdo == NULL || size != sizeof(*wobdo)) {
319                 CERROR("%s: empty obdo in the update: rc = %d\n",
320                        tgt_name(tsi->tsi_tgt), -EPROTO);
321                 RETURN(err_serious(-EPROTO));
322         }
323
324         attr->la_valid = 0;
325         attr->la_valid = 0;
326         obdo_le_to_cpu(wobdo, wobdo);
327         lustre_get_wire_obdo(NULL, lobdo, wobdo);
328         la_from_obdo(attr, lobdo, lobdo->o_valid);
329
330         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
331                              tti->tti_u.update.tti_update_reply,
332                              tti->tti_u.update.tti_update_reply_index);
333
334         RETURN(rc);
335 }
336
337 static int out_attr_get(struct tgt_session_info *tsi)
338 {
339         const struct lu_env     *env = tsi->tsi_env;
340         struct tgt_thread_info  *tti = tgt_th_info(env);
341         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
342         struct lu_attr          *la = &tti->tti_attr;
343         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
344         int                     idx = tti->tti_u.update.tti_update_reply_index;
345         int                     rc;
346
347         ENTRY;
348
349         if (!lu_object_exists(&obj->do_lu)) {
350                 /* Usually, this will be called when the master MDT try
351                  * to init a remote object(see osp_object_init), so if
352                  * the object does not exist on slave, we need set BANSHEE flag,
353                  * so the object can be removed from the cache immediately */
354                 set_bit(LU_OBJECT_HEARD_BANSHEE,
355                         &obj->do_lu.lo_header->loh_flags);
356                 RETURN(-ENOENT);
357         }
358
359         dt_read_lock(env, obj, MOR_TGT_CHILD);
360         rc = dt_attr_get(env, obj, la, NULL);
361         if (rc)
362                 GOTO(out_unlock, rc);
363         /*
364          * If it is a directory, we will also check whether the
365          * directory is empty.
366          * la_flags = 0 : Empty.
367          *          = 1 : Not empty.
368          */
369         la->la_flags = 0;
370         if (S_ISDIR(la->la_mode)) {
371                 struct dt_it            *it;
372                 const struct dt_it_ops  *iops;
373
374                 if (!dt_try_as_dir(env, obj))
375                         GOTO(out_unlock, rc = -ENOTDIR);
376
377                 iops = &obj->do_index_ops->dio_it;
378                 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
379                 if (!IS_ERR(it)) {
380                         int  result;
381                         result = iops->get(env, it, (const void *)"");
382                         if (result > 0) {
383                                 int i;
384                                 for (result = 0, i = 0; result == 0 && i < 3;
385                                      ++i)
386                                         result = iops->next(env, it);
387                                 if (result == 0)
388                                         la->la_flags = 1;
389                         } else if (result == 0)
390                                 /*
391                                  * Huh? Index contains no zero key?
392                                  */
393                                 rc = -EIO;
394
395                         iops->put(env, it);
396                         iops->fini(env, it);
397                 }
398         }
399
400         obdo->o_valid = 0;
401         obdo_from_la(obdo, la, la->la_valid);
402         obdo_cpu_to_le(obdo, obdo);
403         lustre_set_wire_obdo(NULL, obdo, obdo);
404
405 out_unlock:
406         dt_read_unlock(env, obj);
407
408         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
409                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
410                0, rc);
411
412         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
413                                     sizeof(*obdo), idx, rc);
414
415         RETURN(rc);
416 }
417
418 static int out_xattr_get(struct tgt_session_info *tsi)
419 {
420         const struct lu_env        *env = tsi->tsi_env;
421         struct tgt_thread_info     *tti = tgt_th_info(env);
422         struct object_update       *update = tti->tti_u.update.tti_update;
423         struct lu_buf              *lbuf = &tti->tti_buf;
424         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
425         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
426         char                       *name;
427         struct object_update_result *update_result;
428         int                     idx = tti->tti_u.update.tti_update_reply_index;
429         int                        rc;
430
431         ENTRY;
432
433         if (!lu_object_exists(&obj->do_lu)) {
434                 set_bit(LU_OBJECT_HEARD_BANSHEE,
435                         &obj->do_lu.lo_header->loh_flags);
436                 RETURN(-ENOENT);
437         }
438
439         name = object_update_param_get(update, 0, NULL);
440         if (name == NULL) {
441                 CERROR("%s: empty name for xattr get: rc = %d\n",
442                        tgt_name(tsi->tsi_tgt), -EPROTO);
443                 RETURN(err_serious(-EPROTO));
444         }
445
446         update_result = object_update_result_get(reply, 0, NULL);
447         if (update_result == NULL) {
448                 CERROR("%s: empty name for xattr get: rc = %d\n",
449                        tgt_name(tsi->tsi_tgt), -EPROTO);
450                 RETURN(err_serious(-EPROTO));
451         }
452
453         lbuf->lb_buf = update_result->our_data;
454         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
455                        cfs_size_round((unsigned long)update_result->our_data -
456                                       (unsigned long)update_result);
457         dt_read_lock(env, obj, MOR_TGT_CHILD);
458         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
459         dt_read_unlock(env, obj);
460         if (rc < 0) {
461                 lbuf->lb_len = 0;
462                 GOTO(out, rc);
463         }
464         if (rc == 0) {
465                 lbuf->lb_len = 0;
466                 GOTO(out, rc = -ENOENT);
467         }
468         lbuf->lb_len = rc;
469         rc = 0;
470         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
471                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
472                name, (int)lbuf->lb_len);
473
474         GOTO(out, rc);
475
476 out:
477         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
478         RETURN(rc);
479 }
480
481 static int out_index_lookup(struct tgt_session_info *tsi)
482 {
483         const struct lu_env     *env = tsi->tsi_env;
484         struct tgt_thread_info  *tti = tgt_th_info(env);
485         struct object_update    *update = tti->tti_u.update.tti_update;
486         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
487         char                    *name;
488         int                      rc;
489
490         ENTRY;
491
492         if (!lu_object_exists(&obj->do_lu))
493                 RETURN(-ENOENT);
494
495         name = object_update_param_get(update, 0, NULL);
496         if (name == NULL) {
497                 CERROR("%s: empty name for lookup: rc = %d\n",
498                        tgt_name(tsi->tsi_tgt), -EPROTO);
499                 RETURN(err_serious(-EPROTO));
500         }
501
502         dt_read_lock(env, obj, MOR_TGT_CHILD);
503         if (!dt_try_as_dir(env, obj))
504                 GOTO(out_unlock, rc = -ENOTDIR);
505
506         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
507                 (struct dt_key *)name, NULL);
508
509         if (rc < 0)
510                 GOTO(out_unlock, rc);
511
512         if (rc == 0)
513                 rc += 1;
514
515         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
516                PFID(lu_object_fid(&obj->do_lu)), name,
517                PFID(&tti->tti_fid1), rc);
518
519 out_unlock:
520         dt_read_unlock(env, obj);
521
522         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
523                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
524                0, rc);
525
526         object_update_result_insert(tti->tti_u.update.tti_update_reply,
527                             &tti->tti_fid1, sizeof(tti->tti_fid1),
528                             tti->tti_u.update.tti_update_reply_index, rc);
529         RETURN(rc);
530 }
531
532 static int out_tx_xattr_set_exec(const struct lu_env *env,
533                                  struct thandle *th,
534                                  struct tx_arg *arg)
535 {
536         struct dt_object *dt_obj = arg->object;
537         int rc;
538
539         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
540                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
541                arg->u.xattr_set.name, arg->u.xattr_set.flags);
542
543         if (!lu_object_exists(&dt_obj->do_lu))
544                 GOTO(out, rc = -ENOENT);
545
546         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
547         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
548                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
549                           th, NULL);
550         dt_write_unlock(env, dt_obj);
551         /**
552          * Ignore errors if this is LINK EA
553          **/
554         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
555                 rc = 0;
556 out:
557         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
558                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
559
560         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
561
562         return rc;
563 }
564
565 static int __out_tx_xattr_set(const struct lu_env *env,
566                               struct dt_object *dt_obj,
567                               const struct lu_buf *buf,
568                               const char *name, int flags,
569                               struct thandle_exec_args *ta,
570                               struct object_update_reply *reply,
571                               int index, char *file, int line)
572 {
573         struct tx_arg           *arg;
574
575         LASSERT(ta->ta_handle != NULL);
576         ta->ta_err = dt_declare_xattr_set(env, dt_obj, buf, name,
577                                           flags, ta->ta_handle);
578         if (ta->ta_err != 0)
579                 return ta->ta_err;
580
581         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
582         LASSERT(arg);
583         lu_object_get(&dt_obj->do_lu);
584         arg->object = dt_obj;
585         arg->u.xattr_set.name = name;
586         arg->u.xattr_set.flags = flags;
587         arg->u.xattr_set.buf = *buf;
588         arg->reply = reply;
589         arg->index = index;
590         arg->u.xattr_set.csum = 0;
591         return 0;
592 }
593
594 static int out_xattr_set(struct tgt_session_info *tsi)
595 {
596         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
597         struct object_update    *update = tti->tti_u.update.tti_update;
598         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
599         struct lu_buf           *lbuf = &tti->tti_buf;
600         char                    *name;
601         char                    *buf;
602         char                    *tmp;
603         int                      buf_len = 0;
604         int                      flag;
605         int                      rc;
606         ENTRY;
607
608         name = object_update_param_get(update, 0, NULL);
609         if (name == NULL) {
610                 CERROR("%s: empty name for xattr set: rc = %d\n",
611                        tgt_name(tsi->tsi_tgt), -EPROTO);
612                 RETURN(err_serious(-EPROTO));
613         }
614
615         buf = object_update_param_get(update, 1, &buf_len);
616         if (buf == NULL || buf_len == 0) {
617                 CERROR("%s: empty buf for xattr set: rc = %d\n",
618                        tgt_name(tsi->tsi_tgt), -EPROTO);
619                 RETURN(err_serious(-EPROTO));
620         }
621
622         lbuf->lb_buf = buf;
623         lbuf->lb_len = buf_len;
624
625         tmp = (char *)object_update_param_get(update, 2, NULL);
626         if (tmp == NULL) {
627                 CERROR("%s: empty flag for xattr set: rc = %d\n",
628                        tgt_name(tsi->tsi_tgt), -EPROTO);
629                 RETURN(err_serious(-EPROTO));
630         }
631
632         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
633                 __swab32s((__u32 *)tmp);
634         flag = *(int *)tmp;
635
636         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
637                               &tti->tti_tea,
638                               tti->tti_u.update.tti_update_reply,
639                               tti->tti_u.update.tti_update_reply_index);
640         RETURN(rc);
641 }
642
643 static int out_obj_ref_add(const struct lu_env *env,
644                            struct dt_object *dt_obj,
645                            struct thandle *th)
646 {
647         int rc;
648
649         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
650         rc = dt_ref_add(env, dt_obj, th);
651         dt_write_unlock(env, dt_obj);
652
653         return rc;
654 }
655
656 static int out_obj_ref_del(const struct lu_env *env,
657                            struct dt_object *dt_obj,
658                            struct thandle *th)
659 {
660         int rc;
661
662         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
663         rc = dt_ref_del(env, dt_obj, th);
664         dt_write_unlock(env, dt_obj);
665
666         return rc;
667 }
668
669 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
670                                struct tx_arg *arg)
671 {
672         struct dt_object *dt_obj = arg->object;
673         int rc;
674
675         rc = out_obj_ref_add(env, dt_obj, th);
676
677         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
678                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
679
680         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
681         return rc;
682 }
683
684 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
685                                struct tx_arg *arg)
686 {
687         return out_obj_ref_del(env, arg->object, th);
688 }
689
690 static int __out_tx_ref_add(const struct lu_env *env,
691                             struct dt_object *dt_obj,
692                             struct thandle_exec_args *ta,
693                             struct object_update_reply *reply,
694                             int index, char *file, int line)
695 {
696         struct tx_arg   *arg;
697
698         LASSERT(ta->ta_handle != NULL);
699         ta->ta_err = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
700         if (ta->ta_err != 0)
701                 return ta->ta_err;
702
703         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
704                           line);
705         LASSERT(arg);
706         lu_object_get(&dt_obj->do_lu);
707         arg->object = dt_obj;
708         arg->reply = reply;
709         arg->index = index;
710         return 0;
711 }
712
713 /**
714  * increase ref of the object
715  **/
716 static int out_ref_add(struct tgt_session_info *tsi)
717 {
718         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
719         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
720         int                      rc;
721
722         ENTRY;
723
724         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
725                             tti->tti_u.update.tti_update_reply,
726                             tti->tti_u.update.tti_update_reply_index);
727         RETURN(rc);
728 }
729
730 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
731                                struct tx_arg *arg)
732 {
733         struct dt_object        *dt_obj = arg->object;
734         int                      rc;
735
736         rc = out_obj_ref_del(env, dt_obj, th);
737
738         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
739                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
740
741         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
742
743         return rc;
744 }
745
746 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
747                                struct tx_arg *arg)
748 {
749         return out_obj_ref_add(env, arg->object, th);
750 }
751
752 static int __out_tx_ref_del(const struct lu_env *env,
753                             struct dt_object *dt_obj,
754                             struct thandle_exec_args *ta,
755                             struct object_update_reply *reply,
756                             int index, char *file, int line)
757 {
758         struct tx_arg   *arg;
759
760         LASSERT(ta->ta_handle != NULL);
761         ta->ta_err = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
762         if (ta->ta_err != 0)
763                 return ta->ta_err;
764
765         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
766                           line);
767         LASSERT(arg);
768         lu_object_get(&dt_obj->do_lu);
769         arg->object = dt_obj;
770         arg->reply = reply;
771         arg->index = index;
772         return 0;
773 }
774
775 static int out_ref_del(struct tgt_session_info *tsi)
776 {
777         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
778         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
779         int                      rc;
780
781         ENTRY;
782
783         if (!lu_object_exists(&obj->do_lu))
784                 RETURN(-ENOENT);
785
786         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
787                             tti->tti_u.update.tti_update_reply,
788                             tti->tti_u.update.tti_update_reply_index);
789         RETURN(rc);
790 }
791
792 static int out_obj_index_insert(const struct lu_env *env,
793                                 struct dt_object *dt_obj,
794                                 const struct dt_rec *rec,
795                                 const struct dt_key *key,
796                                 struct thandle *th)
797 {
798         int rc;
799
800         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID"\n",
801                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
802                (char *)key, PFID((struct lu_fid *)rec));
803
804         if (dt_try_as_dir(env, dt_obj) == 0)
805                 return -ENOTDIR;
806
807         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
808         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
809         dt_write_unlock(env, dt_obj);
810
811         return rc;
812 }
813
814 static int out_obj_index_delete(const struct lu_env *env,
815                                 struct dt_object *dt_obj,
816                                 const struct dt_key *key,
817                                 struct thandle *th)
818 {
819         int rc;
820
821         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
822                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
823                (char *)key);
824
825         if (dt_try_as_dir(env, dt_obj) == 0)
826                 return -ENOTDIR;
827
828         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
829         rc = dt_delete(env, dt_obj, key, th, NULL);
830         dt_write_unlock(env, dt_obj);
831
832         return rc;
833 }
834
835 static int out_tx_index_insert_exec(const struct lu_env *env,
836                                     struct thandle *th, struct tx_arg *arg)
837 {
838         struct dt_object *dt_obj = arg->object;
839         int rc;
840
841         rc = out_obj_index_insert(env, dt_obj, arg->u.insert.rec,
842                                   arg->u.insert.key, th);
843
844         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
845                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
846
847         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
848
849         return rc;
850 }
851
852 static int out_tx_index_insert_undo(const struct lu_env *env,
853                                     struct thandle *th, struct tx_arg *arg)
854 {
855         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
856 }
857
858 static int __out_tx_index_insert(const struct lu_env *env,
859                                  struct dt_object *dt_obj,
860                                  char *name, struct lu_fid *fid,
861                                  struct thandle_exec_args *ta,
862                                  struct object_update_reply *reply,
863                                  int index, char *file, int line)
864 {
865         struct tx_arg *arg;
866
867         LASSERT(ta->ta_handle != NULL);
868
869         if (dt_try_as_dir(env, dt_obj) == 0) {
870                 ta->ta_err = -ENOTDIR;
871                 return ta->ta_err;
872         }
873
874         ta->ta_err = dt_declare_insert(env, dt_obj,
875                                        (struct dt_rec *)fid,
876                                        (struct dt_key *)name,
877                                        ta->ta_handle);
878
879         if (ta->ta_err != 0)
880                 return ta->ta_err;
881
882         arg = tx_add_exec(ta, out_tx_index_insert_exec,
883                           out_tx_index_insert_undo, file,
884                           line);
885         LASSERT(arg);
886         lu_object_get(&dt_obj->do_lu);
887         arg->object = dt_obj;
888         arg->reply = reply;
889         arg->index = index;
890         arg->u.insert.rec = (struct dt_rec *)fid;
891         arg->u.insert.key = (struct dt_key *)name;
892
893         return 0;
894 }
895
896 static int out_index_insert(struct tgt_session_info *tsi)
897 {
898         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
899         struct object_update    *update = tti->tti_u.update.tti_update;
900         struct dt_object  *obj = tti->tti_u.update.tti_dt_object;
901         struct lu_fid     *fid;
902         char              *name;
903         int                rc = 0;
904         int                size;
905
906         ENTRY;
907
908         name = object_update_param_get(update, 0, NULL);
909         if (name == NULL) {
910                 CERROR("%s: empty name for index insert: rc = %d\n",
911                        tgt_name(tsi->tsi_tgt), -EPROTO);
912                 RETURN(err_serious(-EPROTO));
913         }
914
915         fid = object_update_param_get(update, 1, &size);
916         if (fid == NULL || size != sizeof(*fid)) {
917                 CERROR("%s: invalid fid: rc = %d\n",
918                        tgt_name(tsi->tsi_tgt), -EPROTO);
919                        RETURN(err_serious(-EPROTO));
920         }
921
922         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
923                 lustre_swab_lu_fid(fid);
924
925         if (!fid_is_sane(fid)) {
926                 CERROR("%s: invalid FID "DFID": rc = %d\n",
927                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
928                 RETURN(err_serious(-EPROTO));
929         }
930
931         rc = out_tx_index_insert(tsi->tsi_env, obj, name, fid,
932                                  &tti->tti_tea,
933                                  tti->tti_u.update.tti_update_reply,
934                                  tti->tti_u.update.tti_update_reply_index);
935         RETURN(rc);
936 }
937
938 static int out_tx_index_delete_exec(const struct lu_env *env,
939                                     struct thandle *th,
940                                     struct tx_arg *arg)
941 {
942         int rc;
943
944         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
945
946         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
947                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
948
949         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
950
951         return rc;
952 }
953
954 static int out_tx_index_delete_undo(const struct lu_env *env,
955                                     struct thandle *th,
956                                     struct tx_arg *arg)
957 {
958         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
959                dt_obd_name(th->th_dev), -ENOTSUPP);
960         return -ENOTSUPP;
961 }
962
963 static int __out_tx_index_delete(const struct lu_env *env,
964                                  struct dt_object *dt_obj, char *name,
965                                  struct thandle_exec_args *ta,
966                                  struct object_update_reply *reply,
967                                  int index, char *file, int line)
968 {
969         struct tx_arg *arg;
970
971         if (dt_try_as_dir(env, dt_obj) == 0) {
972                 ta->ta_err = -ENOTDIR;
973                 return ta->ta_err;
974         }
975
976         LASSERT(ta->ta_handle != NULL);
977         ta->ta_err = dt_declare_delete(env, dt_obj,
978                                        (struct dt_key *)name,
979                                        ta->ta_handle);
980         if (ta->ta_err != 0)
981                 return ta->ta_err;
982
983         arg = tx_add_exec(ta, out_tx_index_delete_exec,
984                           out_tx_index_delete_undo, file,
985                           line);
986         LASSERT(arg);
987         lu_object_get(&dt_obj->do_lu);
988         arg->object = dt_obj;
989         arg->reply = reply;
990         arg->index = index;
991         arg->u.insert.key = (struct dt_key *)name;
992         return 0;
993 }
994
995 static int out_index_delete(struct tgt_session_info *tsi)
996 {
997         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
998         struct object_update    *update = tti->tti_u.update.tti_update;
999         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1000         char                    *name;
1001         int                      rc = 0;
1002
1003         if (!lu_object_exists(&obj->do_lu))
1004                 RETURN(-ENOENT);
1005
1006         name = object_update_param_get(update, 0, NULL);
1007         if (name == NULL) {
1008                 CERROR("%s: empty name for index delete: rc = %d\n",
1009                        tgt_name(tsi->tsi_tgt), -EPROTO);
1010                 RETURN(err_serious(-EPROTO));
1011         }
1012
1013         rc = out_tx_index_delete(tsi->tsi_env, obj, name, &tti->tti_tea,
1014                                  tti->tti_u.update.tti_update_reply,
1015                                  tti->tti_u.update.tti_update_reply_index);
1016         RETURN(rc);
1017 }
1018
1019 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1020                                struct tx_arg *arg)
1021 {
1022         struct dt_object *dt_obj = arg->object;
1023         int rc;
1024
1025         rc = out_obj_destroy(env, dt_obj, th);
1026
1027         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1028                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1029
1030         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1031
1032         RETURN(rc);
1033 }
1034
1035 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1036                                struct tx_arg *arg)
1037 {
1038         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1039                dt_obd_name(th->th_dev), -ENOTSUPP);
1040         return -ENOTSUPP;
1041 }
1042
1043 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1044                              struct thandle_exec_args *ta,
1045                              struct object_update_reply *reply,
1046                              int index, char *file, int line)
1047 {
1048         struct tx_arg *arg;
1049
1050         LASSERT(ta->ta_handle != NULL);
1051         ta->ta_err = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1052         if (ta->ta_err)
1053                 return ta->ta_err;
1054
1055         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1056                           file, line);
1057         LASSERT(arg);
1058         lu_object_get(&dt_obj->do_lu);
1059         arg->object = dt_obj;
1060         arg->reply = reply;
1061         arg->index = index;
1062         return 0;
1063 }
1064
1065 static int out_destroy(struct tgt_session_info *tsi)
1066 {
1067         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1068         struct object_update    *update = tti->tti_u.update.tti_update;
1069         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1070         struct lu_fid           *fid;
1071         int                      rc;
1072         ENTRY;
1073
1074         fid = &update->ou_fid;
1075         if (!fid_is_sane(fid)) {
1076                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1077                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1078                 RETURN(err_serious(-EPROTO));
1079         }
1080
1081         if (!lu_object_exists(&obj->do_lu))
1082                 RETURN(-ENOENT);
1083
1084         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1085                             tti->tti_u.update.tti_update_reply,
1086                             tti->tti_u.update.tti_update_reply_index);
1087
1088         RETURN(rc);
1089 }
1090
1091 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1092                              struct tx_arg *arg)
1093 {
1094         struct dt_object *dt_obj = arg->object;
1095         int rc;
1096
1097         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1098         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1099                              &arg->u.write.pos, th);
1100         dt_write_unlock(env, dt_obj);
1101
1102         if (rc == 0)
1103                 rc = arg->u.write.buf.lb_len;
1104
1105         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1106
1107         return rc > 0 ? 0 : rc;
1108 }
1109
1110 static int __out_tx_write(const struct lu_env *env,
1111                           struct dt_object *dt_obj,
1112                           const struct lu_buf *buf,
1113                           loff_t pos, struct thandle_exec_args *ta,
1114                           struct object_update_reply *reply,
1115                           int index, char *file, int line)
1116 {
1117         struct tx_arg   *arg;
1118
1119         LASSERT(ta->ta_handle != NULL);
1120         ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
1121                                              ta->ta_handle);
1122         if (ta->ta_err != 0)
1123                 return ta->ta_err;
1124
1125         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1126         LASSERT(arg);
1127         lu_object_get(&dt_obj->do_lu);
1128         arg->object = dt_obj;
1129         arg->u.write.buf = *buf;
1130         arg->u.write.pos = pos;
1131         arg->reply = reply;
1132         arg->index = index;
1133         return 0;
1134 }
1135
1136 static int out_write(struct tgt_session_info *tsi)
1137 {
1138         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1139         struct object_update    *update = tti->tti_u.update.tti_update;
1140         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1141         struct lu_buf           *lbuf = &tti->tti_buf;
1142         char                    *buf;
1143         char                    *tmp;
1144         int                     buf_len = 0;
1145         loff_t                  pos;
1146         int                      rc;
1147         ENTRY;
1148
1149         buf = object_update_param_get(update, 0, &buf_len);
1150         if (buf == NULL || buf_len == 0) {
1151                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1152                        tgt_name(tsi->tsi_tgt), -EPROTO);
1153                 RETURN(err_serious(-EPROTO));
1154         }
1155         lbuf->lb_buf = buf;
1156         lbuf->lb_len = buf_len;
1157
1158         tmp = (char *)object_update_param_get(update, 1, NULL);
1159         if (tmp == NULL) {
1160                 CERROR("%s: empty flag for xattr set: rc = %d\n",
1161                        tgt_name(tsi->tsi_tgt), -EPROTO);
1162                 RETURN(err_serious(-EPROTO));
1163         }
1164
1165         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1166                 __swab64s((__u64 *)tmp);
1167         pos = *(loff_t *)tmp;
1168
1169         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1170                           &tti->tti_tea,
1171                           tti->tti_u.update.tti_update_reply,
1172                           tti->tti_u.update.tti_update_reply_index);
1173         RETURN(rc);
1174 }
1175
1176 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1177 [opc - OUT_CREATE] = {                                  \
1178         .th_name    = name,                             \
1179         .th_fail_id = 0,                                \
1180         .th_opc     = opc,                              \
1181         .th_flags   = flags,                            \
1182         .th_act     = fn,                               \
1183         .th_fmt     = NULL,                             \
1184         .th_version = 0,                                \
1185 }
1186
1187 #define out_handler mdt_handler
1188 static struct tgt_handler out_update_ops[] = {
1189         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1190                      out_create),
1191         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1192                      out_destroy),
1193         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1194                      out_ref_add),
1195         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1196                      out_ref_del),
1197         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1198                      out_attr_set),
1199         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1200                      out_attr_get),
1201         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1202                      out_xattr_set),
1203         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1204                      out_xattr_get),
1205         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1206                      out_index_lookup),
1207         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1208                      MUTABOR | HABEO_REFERO, out_index_insert),
1209         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1210                      MUTABOR | HABEO_REFERO, out_index_delete),
1211         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1212 };
1213
1214 struct tgt_handler *out_handler_find(__u32 opc)
1215 {
1216         struct tgt_handler *h;
1217
1218         h = NULL;
1219         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1220                 h = &out_update_ops[opc - OUT_CREATE];
1221                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1222                          h->th_opc, opc);
1223         } else {
1224                 h = NULL; /* unsupported opc */
1225         }
1226         return h;
1227 }
1228
1229 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1230                         struct thandle_exec_args *ta, struct obd_export *exp)
1231 {
1232         memset(ta, 0, sizeof(*ta));
1233         ta->ta_handle = dt_trans_create(env, dt);
1234         if (IS_ERR(ta->ta_handle)) {
1235                 int rc;
1236
1237                 rc = PTR_ERR(ta->ta_handle);
1238                 ta->ta_handle = NULL;
1239                 CERROR("%s: start handle error: rc = %d\n",
1240                        dt_obd_name(dt), rc);
1241                 return rc;
1242         }
1243         ta->ta_dev = dt;
1244         if (exp->exp_need_sync)
1245                 ta->ta_handle->th_sync = 1;
1246
1247         return 0;
1248 }
1249
1250 static int out_trans_start(const struct lu_env *env,
1251                            struct thandle_exec_args *ta)
1252 {
1253         return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
1254 }
1255
1256 static int out_trans_stop(const struct lu_env *env,
1257                           struct thandle_exec_args *ta, int err)
1258 {
1259         int i;
1260         int rc;
1261
1262         ta->ta_handle->th_result = err;
1263         rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
1264         for (i = 0; i < ta->ta_argno; i++) {
1265                 if (ta->ta_args[i].object != NULL) {
1266                         struct dt_object *obj = ta->ta_args[i].object;
1267
1268                         /* If the object is being created during this
1269                          * transaction, we need to remove them from the
1270                          * cache immediately, because a few layers are
1271                          * missing in OUT handler, i.e. the object might
1272                          * not be initialized in all layers */
1273                         if (ta->ta_args[i].exec_fn == out_tx_create_exec)
1274                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1275                                         &obj->do_lu.lo_header->loh_flags);
1276                         lu_object_put(env, &ta->ta_args[i].object->do_lu);
1277                         ta->ta_args[i].object = NULL;
1278                 }
1279         }
1280
1281         return rc;
1282 }
1283
1284 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
1285 {
1286         struct tgt_session_info *tsi = tgt_ses_info(env);
1287         int i = 0, rc;
1288
1289         LASSERT(ta->ta_dev);
1290         LASSERT(ta->ta_handle);
1291
1292         if (ta->ta_err != 0 || ta->ta_argno == 0)
1293                 GOTO(stop, rc = ta->ta_err);
1294
1295         rc = out_trans_start(env, ta);
1296         if (unlikely(rc))
1297                 GOTO(stop, rc);
1298
1299         for (i = 0; i < ta->ta_argno; i++) {
1300                 rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
1301                                             &ta->ta_args[i]);
1302                 if (unlikely(rc != 0)) {
1303                         CDEBUG(D_INFO, "error during execution of #%u from"
1304                                " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
1305                                ta->ta_args[i].line, rc);
1306                         while (--i >= 0) {
1307                                 if (ta->ta_args[i].undo_fn != NULL)
1308                                         ta->ta_args[i].undo_fn(env,
1309                                                                ta->ta_handle,
1310                                                               &ta->ta_args[i]);
1311                                 else
1312                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1313                                                dt_obd_name(ta->ta_dev),
1314                                                ta->ta_args[i].file,
1315                                                ta->ta_args[i].line, -ENOTSUPP);
1316                         }
1317                         break;
1318                 }
1319         }
1320
1321         /* Only fail for real update */
1322         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1323 stop:
1324         CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1325                dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
1326         out_trans_stop(env, ta, rc);
1327         ta->ta_handle = NULL;
1328         ta->ta_argno = 0;
1329         ta->ta_err = 0;
1330
1331         RETURN(rc);
1332 }
1333
1334 /**
1335  * Object updates between Targets. Because all the updates has been
1336  * dis-assemblied into object updates at sender side, so OUT will
1337  * call OSD API directly to execute these updates.
1338  *
1339  * In DNE phase I all of the updates in the request need to be executed
1340  * in one transaction, and the transaction has to be synchronously.
1341  *
1342  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1343  * format.
1344  */
1345 int out_handle(struct tgt_session_info *tsi)
1346 {
1347         const struct lu_env             *env = tsi->tsi_env;
1348         struct tgt_thread_info          *tti = tgt_th_info(env);
1349         struct thandle_exec_args        *ta = &tti->tti_tea;
1350         struct req_capsule              *pill = tsi->tsi_pill;
1351         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1352         struct object_update_request    *ureq;
1353         struct object_update            *update;
1354         struct object_update_reply      *reply;
1355         int                              bufsize;
1356         int                              count;
1357         int                              old_batchid = -1;
1358         int                              i;
1359         int                              rc = 0;
1360         int                              rc1 = 0;
1361
1362         ENTRY;
1363
1364         req_capsule_set(pill, &RQF_OUT_UPDATE);
1365         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1366         if (ureq == NULL) {
1367                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1368                        -EPROTO);
1369                 RETURN(err_serious(-EPROTO));
1370         }
1371
1372         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1373         if (bufsize != object_update_request_size(ureq)) {
1374                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1375                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1376                 RETURN(err_serious(-EPROTO));
1377         }
1378
1379         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1380                 CERROR("%s: invalid update buffer magic %x expect %x: "
1381                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1382                        UPDATE_REQUEST_MAGIC, -EPROTO);
1383                 RETURN(err_serious(-EPROTO));
1384         }
1385
1386         count = ureq->ourq_count;
1387         if (count <= 0) {
1388                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1389                        -EPROTO);
1390                 RETURN(err_serious(-EPROTO));
1391         }
1392
1393         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1394                              OUT_UPDATE_REPLY_SIZE);
1395         rc = req_capsule_server_pack(pill);
1396         if (rc != 0) {
1397                 CERROR("%s: Can't pack response: rc = %d\n",
1398                        tgt_name(tsi->tsi_tgt), rc);
1399                 RETURN(rc);
1400         }
1401
1402         /* Prepare the update reply buffer */
1403         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1404         if (reply == NULL)
1405                 RETURN(err_serious(-EPROTO));
1406         object_update_reply_init(reply, count);
1407         tti->tti_u.update.tti_update_reply = reply;
1408
1409         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1410         if (rc != 0)
1411                 RETURN(rc);
1412
1413         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1414
1415         /* Walk through updates in the request to execute them synchronously */
1416         for (i = 0; i < count; i++) {
1417                 struct tgt_handler      *h;
1418                 struct dt_object        *dt_obj;
1419
1420                 update = object_update_request_get(ureq, i, NULL);
1421                 if (update == NULL)
1422                         GOTO(out, rc = -EPROTO);
1423
1424                 if (ptlrpc_req_need_swab(pill->rc_req))
1425                         lustre_swab_object_update(update);
1426
1427                 if (old_batchid == -1) {
1428                         old_batchid = update->ou_batchid;
1429                 } else if (old_batchid != update->ou_batchid) {
1430                         /* Stop the current update transaction,
1431                          * create a new one */
1432                         rc = out_tx_end(env, ta);
1433                         if (rc < 0)
1434                                 RETURN(rc);
1435
1436                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1437                         if (rc != 0)
1438                                 RETURN(rc);
1439                         old_batchid = update->ou_batchid;
1440                 }
1441
1442                 if (!fid_is_sane(&update->ou_fid)) {
1443                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1444                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1445                                -EPROTO);
1446                         GOTO(out, rc = err_serious(-EPROTO));
1447                 }
1448
1449                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1450                 if (IS_ERR(dt_obj))
1451                         GOTO(out, rc = PTR_ERR(dt_obj));
1452
1453                 if (dt->dd_record_fid_accessed) {
1454                         lfsck_pack_rfa(&tti->tti_lr,
1455                                        lu_object_fid(&dt_obj->do_lu));
1456                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1457                 }
1458
1459                 tti->tti_u.update.tti_dt_object = dt_obj;
1460                 tti->tti_u.update.tti_update = update;
1461                 tti->tti_u.update.tti_update_reply_index = i;
1462
1463                 h = out_handler_find(update->ou_type);
1464                 if (likely(h != NULL)) {
1465                         /* For real modification RPC, check if the update
1466                          * has been executed */
1467                         if (h->th_flags & MUTABOR) {
1468                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1469
1470                                 if (out_check_resent(env, dt, dt_obj, req,
1471                                                      out_reconstruct, reply, i))
1472                                         GOTO(next, rc);
1473                         }
1474
1475                         rc = h->th_act(tsi);
1476                 } else {
1477                         CERROR("%s: The unsupported opc: 0x%x\n",
1478                                tgt_name(tsi->tsi_tgt), update->ou_type);
1479                         lu_object_put(env, &dt_obj->do_lu);
1480                         GOTO(out, rc = -ENOTSUPP);
1481                 }
1482 next:
1483                 lu_object_put(env, &dt_obj->do_lu);
1484                 if (rc < 0)
1485                         GOTO(out, rc);
1486         }
1487 out:
1488         rc1 = out_tx_end(env, ta);
1489         if (rc == 0)
1490                 rc = rc1;
1491         RETURN(rc);
1492 }
1493
1494 struct tgt_handler tgt_out_handlers[] = {
1495 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1496 };
1497 EXPORT_SYMBOL(tgt_out_handlers);
1498