Whamcloud - gitweb
25aac41039b816424b39ec85431a3c9188c5752c
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
40 {
41         struct tx_arg   **new_ta;
42         int             i;
43         int             rc = 0;
44
45         if (ta->ta_alloc_args >= new_alloc_ta)
46                 return 0;
47
48         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
49         if (new_ta == NULL)
50                 return -ENOMEM;
51
52         for (i = 0; i < new_alloc_ta; i++) {
53                 if (i < ta->ta_alloc_args) {
54                         /* copy the old args to new one */
55                         new_ta[i] = ta->ta_args[i];
56                 } else {
57                         OBD_ALLOC_PTR(new_ta[i]);
58                         if (new_ta[i] == NULL)
59                                 GOTO(out, rc = -ENOMEM);
60                 }
61         }
62
63         /* free the old args */
64         if (ta->ta_args != NULL)
65                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
66                                       ta->ta_alloc_args);
67
68         ta->ta_args = new_ta;
69         ta->ta_alloc_args = new_alloc_ta;
70 out:
71         if (rc != 0) {
72                 for (i = 0; i < new_alloc_ta; i++) {
73                         if (new_ta[i] != NULL)
74                                 OBD_FREE_PTR(new_ta[i]);
75                 }
76                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
77         }
78         return rc;
79 }
80
81 #define TX_ALLOC_STEP   8
82 static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
83                                   tx_exec_func_t func, tx_exec_func_t undo,
84                                   const char *file, int line)
85 {
86         int rc;
87         int i;
88
89         LASSERT(ta != NULL);
90         LASSERT(func != NULL);
91
92         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
93                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
94                 if (rc != 0)
95                         return ERR_PTR(rc);
96         }
97
98         i = ta->ta_argno;
99
100         ta->ta_argno++;
101
102         ta->ta_args[i]->exec_fn = func;
103         ta->ta_args[i]->undo_fn = undo;
104         ta->ta_args[i]->file    = file;
105         ta->ta_args[i]->line    = line;
106
107         return ta->ta_args[i];
108 }
109
110 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
111                             struct dt_object *obj,
112                             struct object_update_reply *reply,
113                             int index)
114 {
115         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
116                dt_obd_name(dt), reply, index, 0);
117
118         object_update_result_insert(reply, NULL, 0, index, 0);
119         return;
120 }
121
122 typedef void (*out_reconstruct_t)(const struct lu_env *env,
123                                   struct dt_device *dt,
124                                   struct dt_object *obj,
125                                   struct object_update_reply *reply,
126                                   int index);
127
128 static inline int out_check_resent(const struct lu_env *env,
129                                    struct dt_device *dt,
130                                    struct dt_object *obj,
131                                    struct ptlrpc_request *req,
132                                    out_reconstruct_t reconstruct,
133                                    struct object_update_reply *reply,
134                                    int index)
135 {
136         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
137                 return 0;
138
139         if (req_xid_is_last(req)) {
140                 reconstruct(env, dt, obj, reply, index);
141                 return 1;
142         }
143         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
144                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
145         return 0;
146 }
147
148 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
149                            struct thandle *th)
150 {
151         int rc;
152
153         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
154                PFID(lu_object_fid(&dt_obj->do_lu)));
155
156         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
157         rc = dt_destroy(env, dt_obj, th);
158         dt_write_unlock(env, dt_obj);
159
160         return rc;
161 }
162
163 /**
164  * All of the xxx_undo will be used once execution failed,
165  * But because all of the required resource has been reserved in
166  * declare phase, i.e. if declare succeed, it should make sure
167  * the following executing phase succeed in anyway, so these undo
168  * should be useless for most of the time in Phase I
169  */
170 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
171                        struct tx_arg *arg)
172 {
173         int rc;
174
175         rc = out_obj_destroy(env, arg->object, th);
176         if (rc != 0)
177                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
178                        dt_obd_name(th->th_dev), rc);
179         return rc;
180 }
181
182 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
183                        struct tx_arg *arg)
184 {
185         struct dt_object        *dt_obj = arg->object;
186         int                      rc;
187
188         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
189                dt_obd_name(th->th_dev),
190                PFID(lu_object_fid(&arg->object->do_lu)),
191                arg->u.create.dof.dof_type,
192                arg->u.create.attr.la_mode & S_IFMT);
193
194         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
195         rc = dt_create(env, dt_obj, &arg->u.create.attr,
196                        &arg->u.create.hint, &arg->u.create.dof, th);
197
198         dt_write_unlock(env, dt_obj);
199
200         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
201                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
202
203         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
204
205         return rc;
206 }
207
208 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
209                            struct lu_attr *attr, struct lu_fid *parent_fid,
210                            struct dt_object_format *dof,
211                            struct thandle_exec_args *ta,
212                            struct object_update_reply *reply,
213                            int index, const char *file, int line)
214 {
215         struct tx_arg *arg;
216         int rc;
217
218         LASSERT(ta->ta_handle != NULL);
219         rc = dt_declare_create(env, obj, attr, NULL, dof,
220                                        ta->ta_handle);
221         if (rc != 0)
222                 return rc;
223
224         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
225                           line);
226         if (IS_ERR(arg))
227                 return PTR_ERR(arg);
228
229         /* release the object in out_trans_stop */
230         lu_object_get(&obj->do_lu);
231         arg->object = obj;
232         arg->u.create.attr = *attr;
233         if (parent_fid != NULL)
234                 arg->u.create.fid = *parent_fid;
235         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
236         arg->u.create.dof  = *dof;
237         arg->reply = reply;
238         arg->index = index;
239
240         return 0;
241 }
242
243 static int out_create(struct tgt_session_info *tsi)
244 {
245         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
246         struct object_update    *update = tti->tti_u.update.tti_update;
247         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
248         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
249         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
250         struct lu_attr          *attr = &tti->tti_attr;
251         struct lu_fid           *fid = NULL;
252         struct obdo             *wobdo;
253         size_t                  size;
254         int                     rc;
255
256         ENTRY;
257
258         wobdo = object_update_param_get(update, 0, &size);
259         if (wobdo == NULL || size != sizeof(*wobdo)) {
260                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
261                        tgt_name(tsi->tsi_tgt), -EPROTO);
262                 RETURN(err_serious(-EPROTO));
263         }
264
265         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
266                 lustre_swab_obdo(wobdo);
267         lustre_get_wire_obdo(NULL, lobdo, wobdo);
268         la_from_obdo(attr, lobdo, lobdo->o_valid);
269
270         dof->dof_type = dt_mode_to_dft(attr->la_mode);
271         if (update->ou_params_count > 1) {
272                 fid = object_update_param_get(update, 1, &size);
273                 if (fid == NULL || size != sizeof(*fid)) {
274                         CERROR("%s: invalid fid: rc = %d\n",
275                                tgt_name(tsi->tsi_tgt), -EPROTO);
276                         RETURN(err_serious(-EPROTO));
277                 }
278                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
279                         lustre_swab_lu_fid(fid);
280                 if (!fid_is_sane(fid)) {
281                         CERROR("%s: invalid fid "DFID": rc = %d\n",
282                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
283                         RETURN(err_serious(-EPROTO));
284                 }
285         }
286
287         if (lu_object_exists(&obj->do_lu))
288                 RETURN(-EEXIST);
289
290         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
291                            &tti->tti_tea,
292                            tti->tti_u.update.tti_update_reply,
293                            tti->tti_u.update.tti_update_reply_index);
294
295         RETURN(rc);
296 }
297
298 static int out_tx_attr_set_undo(const struct lu_env *env,
299                                 struct thandle *th, struct tx_arg *arg)
300 {
301         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
302                dt_obd_name(th->th_dev),
303                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
304
305         return -ENOTSUPP;
306 }
307
308 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
309                                 struct tx_arg *arg)
310 {
311         struct dt_object        *dt_obj = arg->object;
312         int                     rc;
313
314         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
315                PFID(lu_object_fid(&dt_obj->do_lu)));
316
317         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
318         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
319         dt_write_unlock(env, dt_obj);
320
321         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
322                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
323
324         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
325
326         return rc;
327 }
328
329 static int __out_tx_attr_set(const struct lu_env *env,
330                              struct dt_object *dt_obj,
331                              const struct lu_attr *attr,
332                              struct thandle_exec_args *th,
333                              struct object_update_reply *reply,
334                              int index, const char *file, int line)
335 {
336         struct tx_arg   *arg;
337         int             rc;
338
339         LASSERT(th->ta_handle != NULL);
340         rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
341         if (rc != 0)
342                 return rc;
343
344         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
345                           file, line);
346         if (IS_ERR(arg))
347                 return PTR_ERR(arg);
348
349         lu_object_get(&dt_obj->do_lu);
350         arg->object = dt_obj;
351         arg->u.attr_set.attr = *attr;
352         arg->reply = reply;
353         arg->index = index;
354         return 0;
355 }
356
357 static int out_attr_set(struct tgt_session_info *tsi)
358 {
359         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
360         struct object_update    *update = tti->tti_u.update.tti_update;
361         struct lu_attr          *attr = &tti->tti_attr;
362         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
363         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
364         struct obdo             *wobdo;
365         size_t                   size;
366         int                      rc;
367
368         ENTRY;
369
370         wobdo = object_update_param_get(update, 0, &size);
371         if (wobdo == NULL || size != sizeof(*wobdo)) {
372                 CERROR("%s: empty obdo in the update: rc = %d\n",
373                        tgt_name(tsi->tsi_tgt), -EPROTO);
374                 RETURN(err_serious(-EPROTO));
375         }
376
377         attr->la_valid = 0;
378         attr->la_valid = 0;
379
380         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
381                 lustre_swab_obdo(wobdo);
382         lustre_get_wire_obdo(NULL, lobdo, wobdo);
383         la_from_obdo(attr, lobdo, lobdo->o_valid);
384
385         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
386                              tti->tti_u.update.tti_update_reply,
387                              tti->tti_u.update.tti_update_reply_index);
388
389         RETURN(rc);
390 }
391
392 static int out_attr_get(struct tgt_session_info *tsi)
393 {
394         const struct lu_env     *env = tsi->tsi_env;
395         struct tgt_thread_info  *tti = tgt_th_info(env);
396         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
397         struct lu_attr          *la = &tti->tti_attr;
398         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
399         int                     idx = tti->tti_u.update.tti_update_reply_index;
400         int                     rc;
401
402         ENTRY;
403
404         if (!lu_object_exists(&obj->do_lu)) {
405                 /* Usually, this will be called when the master MDT try
406                  * to init a remote object(see osp_object_init), so if
407                  * the object does not exist on slave, we need set BANSHEE flag,
408                  * so the object can be removed from the cache immediately */
409                 set_bit(LU_OBJECT_HEARD_BANSHEE,
410                         &obj->do_lu.lo_header->loh_flags);
411                 RETURN(-ENOENT);
412         }
413
414         dt_read_lock(env, obj, MOR_TGT_CHILD);
415         rc = dt_attr_get(env, obj, la, NULL);
416         if (rc)
417                 GOTO(out_unlock, rc);
418
419         obdo->o_valid = 0;
420         obdo_from_la(obdo, la, la->la_valid);
421         lustre_set_wire_obdo(NULL, obdo, obdo);
422
423 out_unlock:
424         dt_read_unlock(env, obj);
425
426         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
427                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
428                0, rc);
429
430         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
431                                     sizeof(*obdo), idx, rc);
432
433         RETURN(rc);
434 }
435
436 static int out_xattr_get(struct tgt_session_info *tsi)
437 {
438         const struct lu_env        *env = tsi->tsi_env;
439         struct tgt_thread_info     *tti = tgt_th_info(env);
440         struct object_update       *update = tti->tti_u.update.tti_update;
441         struct lu_buf              *lbuf = &tti->tti_buf;
442         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
443         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
444         char                       *name;
445         struct object_update_result *update_result;
446         int                     idx = tti->tti_u.update.tti_update_reply_index;
447         int                        rc;
448
449         ENTRY;
450
451         if (!lu_object_exists(&obj->do_lu)) {
452                 set_bit(LU_OBJECT_HEARD_BANSHEE,
453                         &obj->do_lu.lo_header->loh_flags);
454                 RETURN(-ENOENT);
455         }
456
457         name = object_update_param_get(update, 0, NULL);
458         if (name == NULL) {
459                 CERROR("%s: empty name for xattr get: rc = %d\n",
460                        tgt_name(tsi->tsi_tgt), -EPROTO);
461                 RETURN(err_serious(-EPROTO));
462         }
463
464         update_result = object_update_result_get(reply, 0, NULL);
465         if (update_result == NULL) {
466                 CERROR("%s: empty name for xattr get: rc = %d\n",
467                        tgt_name(tsi->tsi_tgt), -EPROTO);
468                 RETURN(err_serious(-EPROTO));
469         }
470
471         lbuf->lb_buf = update_result->our_data;
472         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
473                        cfs_size_round((unsigned long)update_result->our_data -
474                                       (unsigned long)update_result);
475         dt_read_lock(env, obj, MOR_TGT_CHILD);
476         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
477         dt_read_unlock(env, obj);
478         if (rc < 0) {
479                 lbuf->lb_len = 0;
480                 GOTO(out, rc);
481         }
482         if (rc == 0) {
483                 lbuf->lb_len = 0;
484                 GOTO(out, rc = -ENOENT);
485         }
486         lbuf->lb_len = rc;
487         rc = 0;
488         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
489                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
490                name, (int)lbuf->lb_len);
491
492         GOTO(out, rc);
493
494 out:
495         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
496         RETURN(rc);
497 }
498
499 static int out_index_lookup(struct tgt_session_info *tsi)
500 {
501         const struct lu_env     *env = tsi->tsi_env;
502         struct tgt_thread_info  *tti = tgt_th_info(env);
503         struct object_update    *update = tti->tti_u.update.tti_update;
504         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
505         char                    *name;
506         int                      rc;
507
508         ENTRY;
509
510         if (!lu_object_exists(&obj->do_lu))
511                 RETURN(-ENOENT);
512
513         name = object_update_param_get(update, 0, NULL);
514         if (name == NULL) {
515                 CERROR("%s: empty name for lookup: rc = %d\n",
516                        tgt_name(tsi->tsi_tgt), -EPROTO);
517                 RETURN(err_serious(-EPROTO));
518         }
519
520         dt_read_lock(env, obj, MOR_TGT_CHILD);
521         if (!dt_try_as_dir(env, obj))
522                 GOTO(out_unlock, rc = -ENOTDIR);
523
524         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
525                 (struct dt_key *)name, NULL);
526
527         if (rc < 0)
528                 GOTO(out_unlock, rc);
529
530         if (rc == 0)
531                 rc += 1;
532
533 out_unlock:
534         dt_read_unlock(env, obj);
535
536         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
537                PFID(lu_object_fid(&obj->do_lu)), name,
538                PFID(&tti->tti_fid1), rc);
539
540         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
541                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
542                0, rc);
543
544         object_update_result_insert(tti->tti_u.update.tti_update_reply,
545                             &tti->tti_fid1, sizeof(tti->tti_fid1),
546                             tti->tti_u.update.tti_update_reply_index, rc);
547         RETURN(rc);
548 }
549
550 static int out_tx_xattr_set_exec(const struct lu_env *env,
551                                  struct thandle *th,
552                                  struct tx_arg *arg)
553 {
554         struct dt_object *dt_obj = arg->object;
555         int rc;
556
557         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
558                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
559                arg->u.xattr_set.name, arg->u.xattr_set.flags);
560
561         if (!lu_object_exists(&dt_obj->do_lu))
562                 GOTO(out, rc = -ENOENT);
563
564         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
565         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
566                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
567                           th, NULL);
568         dt_write_unlock(env, dt_obj);
569         /**
570          * Ignore errors if this is LINK EA
571          **/
572         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
573                 rc = 0;
574 out:
575         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
576                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
577
578         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
579
580         return rc;
581 }
582
583 static int __out_tx_xattr_set(const struct lu_env *env,
584                               struct dt_object *dt_obj,
585                               const struct lu_buf *buf,
586                               const char *name, int flags,
587                               struct thandle_exec_args *ta,
588                               struct object_update_reply *reply,
589                               int index, const char *file, int line)
590 {
591         struct tx_arg   *arg;
592         int             rc;
593
594         LASSERT(ta->ta_handle != NULL);
595         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle);
596         if (rc != 0)
597                 return rc;
598
599         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
600         if (IS_ERR(arg))
601                 return PTR_ERR(arg);
602
603         lu_object_get(&dt_obj->do_lu);
604         arg->object = dt_obj;
605         arg->u.xattr_set.name = name;
606         arg->u.xattr_set.flags = flags;
607         arg->u.xattr_set.buf = *buf;
608         arg->reply = reply;
609         arg->index = index;
610         arg->u.xattr_set.csum = 0;
611         return 0;
612 }
613
614 static int out_xattr_set(struct tgt_session_info *tsi)
615 {
616         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
617         struct object_update    *update = tti->tti_u.update.tti_update;
618         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
619         struct lu_buf           *lbuf = &tti->tti_buf;
620         char                    *name;
621         char                    *buf;
622         __u32                   *tmp;
623         size_t                   buf_len = 0;
624         int                      flag;
625         size_t                   size = 0;
626         int                      rc;
627         ENTRY;
628
629         name = object_update_param_get(update, 0, NULL);
630         if (name == NULL) {
631                 CERROR("%s: empty name for xattr set: rc = %d\n",
632                        tgt_name(tsi->tsi_tgt), -EPROTO);
633                 RETURN(err_serious(-EPROTO));
634         }
635
636         buf = object_update_param_get(update, 1, &buf_len);
637         if (buf == NULL || buf_len == 0) {
638                 CERROR("%s: empty buf for xattr set: rc = %d\n",
639                        tgt_name(tsi->tsi_tgt), -EPROTO);
640                 RETURN(err_serious(-EPROTO));
641         }
642
643         lbuf->lb_buf = buf;
644         lbuf->lb_len = buf_len;
645
646         tmp = object_update_param_get(update, 2, &size);
647         if (tmp == NULL || size != sizeof(*tmp)) {
648                 CERROR("%s: emptry or wrong size %zd flag: rc = %d\n",
649                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
650                 RETURN(err_serious(-EPROTO));
651         }
652
653         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
654                 __swab32s(tmp);
655         flag = *tmp;
656
657         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
658                               &tti->tti_tea,
659                               tti->tti_u.update.tti_update_reply,
660                               tti->tti_u.update.tti_update_reply_index);
661         RETURN(rc);
662 }
663
664 static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th,
665                                  struct tx_arg *arg)
666 {
667         struct dt_object *dt_obj = arg->object;
668         int rc;
669
670         CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n",
671                dt_obd_name(th->th_dev), arg->u.xattr_set.name,
672                PFID(lu_object_fid(&dt_obj->do_lu)));
673
674         if (!lu_object_exists(&dt_obj->do_lu))
675                 GOTO(out, rc = -ENOENT);
676
677         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
678         rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name,
679                           th, NULL);
680         dt_write_unlock(env, dt_obj);
681 out:
682         CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n",
683                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
684
685         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
686
687         return rc;
688 }
689
690 static int __out_tx_xattr_del(const struct lu_env *env,
691                               struct dt_object *dt_obj, const char *name,
692                               struct thandle_exec_args *ta,
693                               struct object_update_reply *reply,
694                               int index, const char *file, int line)
695 {
696         struct tx_arg   *arg;
697         int             rc;
698
699         rc = dt_declare_xattr_del(env, dt_obj, name, ta->ta_handle);
700         if (rc != 0)
701                 return rc;
702
703         arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line);
704         if (IS_ERR(arg))
705                 return PTR_ERR(arg);
706
707         lu_object_get(&dt_obj->do_lu);
708         arg->object = dt_obj;
709         arg->u.xattr_set.name = name;
710         arg->reply = reply;
711         arg->index = index;
712         return 0;
713 }
714
715 static int out_xattr_del(struct tgt_session_info *tsi)
716 {
717         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
718         struct object_update    *update = tti->tti_u.update.tti_update;
719         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
720         char                    *name;
721         int                      rc;
722         ENTRY;
723
724         name = object_update_param_get(update, 0, NULL);
725         if (name == NULL) {
726                 CERROR("%s: empty name for xattr set: rc = %d\n",
727                        tgt_name(tsi->tsi_tgt), -EPROTO);
728                 RETURN(err_serious(-EPROTO));
729         }
730
731         rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea,
732                               tti->tti_u.update.tti_update_reply,
733                               tti->tti_u.update.tti_update_reply_index);
734         RETURN(rc);
735 }
736
737 static int out_obj_ref_add(const struct lu_env *env,
738                            struct dt_object *dt_obj,
739                            struct thandle *th)
740 {
741         int rc;
742
743         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
744         rc = dt_ref_add(env, dt_obj, th);
745         dt_write_unlock(env, dt_obj);
746
747         return rc;
748 }
749
750 static int out_obj_ref_del(const struct lu_env *env,
751                            struct dt_object *dt_obj,
752                            struct thandle *th)
753 {
754         int rc;
755
756         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
757         rc = dt_ref_del(env, dt_obj, th);
758         dt_write_unlock(env, dt_obj);
759
760         return rc;
761 }
762
763 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
764                                struct tx_arg *arg)
765 {
766         struct dt_object *dt_obj = arg->object;
767         int rc;
768
769         rc = out_obj_ref_add(env, dt_obj, th);
770
771         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
772                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
773
774         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
775         return rc;
776 }
777
778 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
779                                struct tx_arg *arg)
780 {
781         return out_obj_ref_del(env, arg->object, th);
782 }
783
784 static int __out_tx_ref_add(const struct lu_env *env,
785                             struct dt_object *dt_obj,
786                             struct thandle_exec_args *ta,
787                             struct object_update_reply *reply,
788                             int index, const char *file, int line)
789 {
790         struct tx_arg   *arg;
791         int             rc;
792
793         LASSERT(ta->ta_handle != NULL);
794         rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
795         if (rc != 0)
796                 return rc;
797
798         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
799                           line);
800         if (IS_ERR(arg))
801                 return PTR_ERR(arg);
802
803         lu_object_get(&dt_obj->do_lu);
804         arg->object = dt_obj;
805         arg->reply = reply;
806         arg->index = index;
807         return 0;
808 }
809
810 /**
811  * increase ref of the object
812  **/
813 static int out_ref_add(struct tgt_session_info *tsi)
814 {
815         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
816         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
817         int                      rc;
818
819         ENTRY;
820
821         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
822                             tti->tti_u.update.tti_update_reply,
823                             tti->tti_u.update.tti_update_reply_index);
824         RETURN(rc);
825 }
826
827 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
828                                struct tx_arg *arg)
829 {
830         struct dt_object        *dt_obj = arg->object;
831         int                      rc;
832
833         rc = out_obj_ref_del(env, dt_obj, th);
834
835         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
836                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
837
838         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
839
840         return rc;
841 }
842
843 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
844                                struct tx_arg *arg)
845 {
846         return out_obj_ref_add(env, arg->object, th);
847 }
848
849 static int __out_tx_ref_del(const struct lu_env *env,
850                             struct dt_object *dt_obj,
851                             struct thandle_exec_args *ta,
852                             struct object_update_reply *reply,
853                             int index, const char *file, int line)
854 {
855         struct tx_arg   *arg;
856         int             rc;
857
858         LASSERT(ta->ta_handle != NULL);
859         rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
860         if (rc != 0)
861                 return rc;
862
863         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
864                           line);
865         if (IS_ERR(arg))
866                 return PTR_ERR(arg);
867
868         lu_object_get(&dt_obj->do_lu);
869         arg->object = dt_obj;
870         arg->reply = reply;
871         arg->index = index;
872         return 0;
873 }
874
875 static int out_ref_del(struct tgt_session_info *tsi)
876 {
877         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
878         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
879         int                      rc;
880
881         ENTRY;
882
883         if (!lu_object_exists(&obj->do_lu))
884                 RETURN(-ENOENT);
885
886         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
887                             tti->tti_u.update.tti_update_reply,
888                             tti->tti_u.update.tti_update_reply_index);
889         RETURN(rc);
890 }
891
892 static int out_obj_index_insert(const struct lu_env *env,
893                                 struct dt_object *dt_obj,
894                                 const struct dt_rec *rec,
895                                 const struct dt_key *key,
896                                 struct thandle *th)
897 {
898         int rc;
899
900         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n",
901                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
902                (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid),
903                ((struct dt_insert_rec *)rec)->rec_type);
904
905         if (dt_try_as_dir(env, dt_obj) == 0)
906                 return -ENOTDIR;
907
908         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
909         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
910         dt_write_unlock(env, dt_obj);
911
912         return rc;
913 }
914
915 static int out_obj_index_delete(const struct lu_env *env,
916                                 struct dt_object *dt_obj,
917                                 const struct dt_key *key,
918                                 struct thandle *th)
919 {
920         int rc;
921
922         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
923                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
924                (char *)key);
925
926         if (dt_try_as_dir(env, dt_obj) == 0)
927                 return -ENOTDIR;
928
929         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
930         rc = dt_delete(env, dt_obj, key, th, NULL);
931         dt_write_unlock(env, dt_obj);
932
933         return rc;
934 }
935
936 static int out_tx_index_insert_exec(const struct lu_env *env,
937                                     struct thandle *th, struct tx_arg *arg)
938 {
939         struct dt_object *dt_obj = arg->object;
940         int rc;
941
942         if (unlikely(!dt_object_exists(dt_obj)))
943                 RETURN(-ESTALE);
944
945         rc = out_obj_index_insert(env, dt_obj,
946                                   (const struct dt_rec *)&arg->u.insert.rec,
947                                   arg->u.insert.key, th);
948
949         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
950                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
951
952         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
953
954         return rc;
955 }
956
957 static int out_tx_index_insert_undo(const struct lu_env *env,
958                                     struct thandle *th, struct tx_arg *arg)
959 {
960         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
961 }
962
963 static int __out_tx_index_insert(const struct lu_env *env,
964                                  struct dt_object *dt_obj,
965                                  const struct dt_rec *rec,
966                                  const struct dt_key *key,
967                                  struct thandle_exec_args *ta,
968                                  struct object_update_reply *reply,
969                                  int index, const char *file, int line)
970 {
971         struct tx_arg   *arg;
972         int             rc;
973
974         LASSERT(ta->ta_handle != NULL);
975         if (dt_try_as_dir(env, dt_obj) == 0) {
976                 rc = -ENOTDIR;
977                 return rc;
978         }
979
980         rc = dt_declare_insert(env, dt_obj, rec, key, ta->ta_handle);
981         if (rc != 0)
982                 return rc;
983
984         arg = tx_add_exec(ta, out_tx_index_insert_exec,
985                           out_tx_index_insert_undo, file, line);
986         if (IS_ERR(arg))
987                 return PTR_ERR(arg);
988
989         lu_object_get(&dt_obj->do_lu);
990         arg->object = dt_obj;
991         arg->reply = reply;
992         arg->index = index;
993         arg->u.insert.rec = *(const struct dt_insert_rec *)rec;
994         arg->u.insert.key = key;
995
996         return 0;
997 }
998
999 static int out_index_insert(struct tgt_session_info *tsi)
1000 {
1001         struct tgt_thread_info  *tti    = tgt_th_info(tsi->tsi_env);
1002         struct object_update    *update = tti->tti_u.update.tti_update;
1003         struct dt_object        *obj    = tti->tti_u.update.tti_dt_object;
1004         struct dt_insert_rec    *rec    = &tti->tti_rec;
1005         struct lu_fid           *fid;
1006         char                    *name;
1007         __u32                   *ptype;
1008         int                      rc     = 0;
1009         size_t                   size;
1010         ENTRY;
1011
1012         name = object_update_param_get(update, 0, NULL);
1013         if (name == NULL) {
1014                 CERROR("%s: empty name for index insert: rc = %d\n",
1015                        tgt_name(tsi->tsi_tgt), -EPROTO);
1016                 RETURN(err_serious(-EPROTO));
1017         }
1018
1019         fid = object_update_param_get(update, 1, &size);
1020         if (fid == NULL || size != sizeof(*fid)) {
1021                 CERROR("%s: invalid fid: rc = %d\n",
1022                        tgt_name(tsi->tsi_tgt), -EPROTO);
1023                        RETURN(err_serious(-EPROTO));
1024         }
1025
1026         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1027                 lustre_swab_lu_fid(fid);
1028
1029         if (!fid_is_sane(fid)) {
1030                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1031                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1032                 RETURN(err_serious(-EPROTO));
1033         }
1034
1035         ptype = object_update_param_get(update, 2, &size);
1036         if (ptype == NULL || size != sizeof(*ptype)) {
1037                 CERROR("%s: invalid type for index insert: rc = %d\n",
1038                        tgt_name(tsi->tsi_tgt), -EPROTO);
1039                 RETURN(err_serious(-EPROTO));
1040         }
1041
1042         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1043                 __swab32s(ptype);
1044
1045         rec->rec_fid = fid;
1046         rec->rec_type = *ptype;
1047
1048         rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec,
1049                                  (const struct dt_key *)name, &tti->tti_tea,
1050                                  tti->tti_u.update.tti_update_reply,
1051                                  tti->tti_u.update.tti_update_reply_index);
1052         RETURN(rc);
1053 }
1054
1055 static int out_tx_index_delete_exec(const struct lu_env *env,
1056                                     struct thandle *th,
1057                                     struct tx_arg *arg)
1058 {
1059         int rc;
1060
1061         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1062
1063         CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n",
1064                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1065
1066         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1067
1068         return rc;
1069 }
1070
1071 static int out_tx_index_delete_undo(const struct lu_env *env,
1072                                     struct thandle *th,
1073                                     struct tx_arg *arg)
1074 {
1075         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1076                dt_obd_name(th->th_dev), -ENOTSUPP);
1077         return -ENOTSUPP;
1078 }
1079
1080 static int __out_tx_index_delete(const struct lu_env *env,
1081                                  struct dt_object *dt_obj,
1082                                  const struct dt_key *key,
1083                                  struct thandle_exec_args *ta,
1084                                  struct object_update_reply *reply,
1085                                  int index, const char *file, int line)
1086 {
1087         struct tx_arg   *arg;
1088         int             rc;
1089
1090         if (dt_try_as_dir(env, dt_obj) == 0) {
1091                 rc = -ENOTDIR;
1092                 return rc;
1093         }
1094
1095         LASSERT(ta->ta_handle != NULL);
1096         rc = dt_declare_delete(env, dt_obj, key, ta->ta_handle);
1097         if (rc != 0)
1098                 return rc;
1099
1100         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1101                           out_tx_index_delete_undo, file, line);
1102         if (IS_ERR(arg))
1103                 return PTR_ERR(arg);
1104
1105         lu_object_get(&dt_obj->do_lu);
1106         arg->object = dt_obj;
1107         arg->reply = reply;
1108         arg->index = index;
1109         arg->u.insert.key = key;
1110         return 0;
1111 }
1112
1113 static int out_index_delete(struct tgt_session_info *tsi)
1114 {
1115         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1116         struct object_update    *update = tti->tti_u.update.tti_update;
1117         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1118         char                    *name;
1119         int                      rc = 0;
1120
1121         if (!lu_object_exists(&obj->do_lu))
1122                 RETURN(-ENOENT);
1123
1124         name = object_update_param_get(update, 0, NULL);
1125         if (name == NULL) {
1126                 CERROR("%s: empty name for index delete: rc = %d\n",
1127                        tgt_name(tsi->tsi_tgt), -EPROTO);
1128                 RETURN(err_serious(-EPROTO));
1129         }
1130
1131         rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name,
1132                                  &tti->tti_tea,
1133                                  tti->tti_u.update.tti_update_reply,
1134                                  tti->tti_u.update.tti_update_reply_index);
1135         RETURN(rc);
1136 }
1137
1138 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1139                                struct tx_arg *arg)
1140 {
1141         struct dt_object *dt_obj = arg->object;
1142         int rc;
1143
1144         rc = out_obj_destroy(env, dt_obj, th);
1145
1146         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1147                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1148
1149         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1150
1151         RETURN(rc);
1152 }
1153
1154 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1155                                struct tx_arg *arg)
1156 {
1157         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1158                dt_obd_name(th->th_dev), -ENOTSUPP);
1159         return -ENOTSUPP;
1160 }
1161
1162 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1163                              struct thandle_exec_args *ta,
1164                              struct object_update_reply *reply,
1165                              int index, const char *file, int line)
1166 {
1167         struct tx_arg   *arg;
1168         int             rc;
1169
1170         LASSERT(ta->ta_handle != NULL);
1171         rc = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1172         if (rc != 0)
1173                 return rc;
1174
1175         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1176                           file, line);
1177         if (IS_ERR(arg))
1178                 return PTR_ERR(arg);
1179
1180         lu_object_get(&dt_obj->do_lu);
1181         arg->object = dt_obj;
1182         arg->reply = reply;
1183         arg->index = index;
1184         return 0;
1185 }
1186
1187 static int out_destroy(struct tgt_session_info *tsi)
1188 {
1189         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1190         struct object_update    *update = tti->tti_u.update.tti_update;
1191         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1192         struct lu_fid           *fid;
1193         int                      rc;
1194         ENTRY;
1195
1196         fid = &update->ou_fid;
1197         if (!fid_is_sane(fid)) {
1198                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1199                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1200                 RETURN(err_serious(-EPROTO));
1201         }
1202
1203         if (!lu_object_exists(&obj->do_lu))
1204                 RETURN(-ENOENT);
1205
1206         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1207                             tti->tti_u.update.tti_update_reply,
1208                             tti->tti_u.update.tti_update_reply_index);
1209
1210         RETURN(rc);
1211 }
1212
1213 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1214                              struct tx_arg *arg)
1215 {
1216         struct dt_object *dt_obj = arg->object;
1217         int rc;
1218
1219         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1220         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1221                              &arg->u.write.pos, th);
1222         dt_write_unlock(env, dt_obj);
1223
1224         if (rc == 0)
1225                 rc = arg->u.write.buf.lb_len;
1226
1227         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1228
1229         return rc > 0 ? 0 : rc;
1230 }
1231
1232 static int __out_tx_write(const struct lu_env *env,
1233                           struct dt_object *dt_obj,
1234                           const struct lu_buf *buf,
1235                           loff_t pos, struct thandle_exec_args *ta,
1236                           struct object_update_reply *reply,
1237                           int index, const char *file, int line)
1238 {
1239         struct tx_arg   *arg;
1240         int             rc;
1241
1242         LASSERT(ta->ta_handle != NULL);
1243         rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle);
1244         if (rc != 0)
1245                 return rc;
1246
1247         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1248         if (IS_ERR(arg))
1249                 return PTR_ERR(arg);
1250
1251         lu_object_get(&dt_obj->do_lu);
1252         arg->object = dt_obj;
1253         arg->u.write.buf = *buf;
1254         arg->u.write.pos = pos;
1255         arg->reply = reply;
1256         arg->index = index;
1257         return 0;
1258 }
1259
1260 static int out_write(struct tgt_session_info *tsi)
1261 {
1262         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1263         struct object_update    *update = tti->tti_u.update.tti_update;
1264         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1265         struct lu_buf           *lbuf = &tti->tti_buf;
1266         char                    *buf;
1267         __u64                   *tmp;
1268         size_t                  size = 0;
1269         size_t                  buf_len = 0;
1270         loff_t                  pos;
1271         int                      rc;
1272         ENTRY;
1273
1274         buf = object_update_param_get(update, 0, &buf_len);
1275         if (buf == NULL || buf_len == 0) {
1276                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1277                        tgt_name(tsi->tsi_tgt), -EPROTO);
1278                 RETURN(err_serious(-EPROTO));
1279         }
1280         lbuf->lb_buf = buf;
1281         lbuf->lb_len = buf_len;
1282
1283         tmp = object_update_param_get(update, 1, &size);
1284         if (tmp == NULL || size != sizeof(*tmp)) {
1285                 CERROR("%s: empty or wrong size %zd pos: rc = %d\n",
1286                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
1287                 RETURN(err_serious(-EPROTO));
1288         }
1289
1290         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1291                 __swab64s(tmp);
1292         pos = *tmp;
1293
1294         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1295                           &tti->tti_tea,
1296                           tti->tti_u.update.tti_update_reply,
1297                           tti->tti_u.update.tti_update_reply_index);
1298         RETURN(rc);
1299 }
1300
1301 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1302 [opc - OUT_CREATE] = {                                  \
1303         .th_name    = name,                             \
1304         .th_fail_id = 0,                                \
1305         .th_opc     = opc,                              \
1306         .th_flags   = flags,                            \
1307         .th_act     = fn,                               \
1308         .th_fmt     = NULL,                             \
1309         .th_version = 0,                                \
1310 }
1311
1312 static struct tgt_handler out_update_ops[] = {
1313         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1314                      out_create),
1315         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1316                      out_destroy),
1317         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1318                      out_ref_add),
1319         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1320                      out_ref_del),
1321         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1322                      out_attr_set),
1323         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1324                      out_attr_get),
1325         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1326                      out_xattr_set),
1327         DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO,
1328                      out_xattr_del),
1329         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1330                      out_xattr_get),
1331         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1332                      out_index_lookup),
1333         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1334                      MUTABOR | HABEO_REFERO, out_index_insert),
1335         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1336                      MUTABOR | HABEO_REFERO, out_index_delete),
1337         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1338 };
1339
1340 struct tgt_handler *out_handler_find(__u32 opc)
1341 {
1342         struct tgt_handler *h;
1343
1344         h = NULL;
1345         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1346                 h = &out_update_ops[opc - OUT_CREATE];
1347                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1348                          h->th_opc, opc);
1349         } else {
1350                 h = NULL; /* unsupported opc */
1351         }
1352         return h;
1353 }
1354
1355 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1356                         struct thandle_exec_args *ta, struct obd_export *exp)
1357 {
1358         ta->ta_argno = 0;
1359         ta->ta_handle = dt_trans_create(env, dt);
1360         if (IS_ERR(ta->ta_handle)) {
1361                 int rc;
1362
1363                 rc = PTR_ERR(ta->ta_handle);
1364                 ta->ta_handle = NULL;
1365                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
1366                        rc);
1367                 return rc;
1368         }
1369         if (exp->exp_need_sync)
1370                 ta->ta_handle->th_sync = 1;
1371
1372         return 0;
1373 }
1374
1375 static int out_trans_start(const struct lu_env *env,
1376                            struct thandle_exec_args *ta)
1377 {
1378         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
1379 }
1380
1381 static int out_trans_stop(const struct lu_env *env,
1382                           struct thandle_exec_args *ta, int err)
1383 {
1384         int i;
1385         int rc;
1386
1387         ta->ta_handle->th_result = err;
1388         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
1389         for (i = 0; i < ta->ta_argno; i++) {
1390                 if (ta->ta_args[i]->object != NULL) {
1391                         struct dt_object *obj = ta->ta_args[i]->object;
1392
1393                         /* If the object is being created during this
1394                          * transaction, we need to remove them from the
1395                          * cache immediately, because a few layers are
1396                          * missing in OUT handler, i.e. the object might
1397                          * not be initialized in all layers */
1398                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
1399                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1400                                         &obj->do_lu.lo_header->loh_flags);
1401                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1402                         ta->ta_args[i]->object = NULL;
1403                 }
1404         }
1405         ta->ta_handle = NULL;
1406         ta->ta_argno = 0;
1407
1408         return rc;
1409 }
1410
1411 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
1412                int declare_ret)
1413 {
1414         struct tgt_session_info *tsi = tgt_ses_info(env);
1415         int                     i;
1416         int                     rc;
1417         int                     rc1;
1418         ENTRY;
1419
1420         if (ta->ta_handle == NULL)
1421                 RETURN(0);
1422
1423         if (declare_ret != 0 || ta->ta_argno == 0)
1424                 GOTO(stop, rc = declare_ret);
1425
1426         LASSERT(ta->ta_handle->th_dev != NULL);
1427         rc = out_trans_start(env, ta);
1428         if (unlikely(rc != 0))
1429                 GOTO(stop, rc);
1430
1431         for (i = 0; i < ta->ta_argno; i++) {
1432                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
1433                                              ta->ta_args[i]);
1434                 if (unlikely(rc != 0)) {
1435                         CDEBUG(D_INFO, "error during execution of #%u from"
1436                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1437                                ta->ta_args[i]->line, rc);
1438                         while (--i >= 0) {
1439                                 if (ta->ta_args[i]->undo_fn != NULL)
1440                                         ta->ta_args[i]->undo_fn(env,
1441                                                                ta->ta_handle,
1442                                                                ta->ta_args[i]);
1443                                 else
1444                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1445                                              dt_obd_name(ta->ta_handle->th_dev),
1446                                                ta->ta_args[i]->file,
1447                                                ta->ta_args[i]->line, -ENOTSUPP);
1448                         }
1449                         break;
1450                 }
1451                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1452                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
1453         }
1454
1455         /* Only fail for real update */
1456         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1457 stop:
1458         rc1 = out_trans_stop(env, ta, rc);
1459         if (rc == 0)
1460                 rc = rc1;
1461
1462         ta->ta_handle = NULL;
1463         ta->ta_argno = 0;
1464
1465         RETURN(rc);
1466 }
1467
1468 /**
1469  * Object updates between Targets. Because all the updates has been
1470  * dis-assemblied into object updates at sender side, so OUT will
1471  * call OSD API directly to execute these updates.
1472  *
1473  * In DNE phase I all of the updates in the request need to be executed
1474  * in one transaction, and the transaction has to be synchronously.
1475  *
1476  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1477  * format.
1478  */
1479 int out_handle(struct tgt_session_info *tsi)
1480 {
1481         const struct lu_env             *env = tsi->tsi_env;
1482         struct tgt_thread_info          *tti = tgt_th_info(env);
1483         struct thandle_exec_args        *ta = &tti->tti_tea;
1484         struct req_capsule              *pill = tsi->tsi_pill;
1485         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1486         struct object_update_request    *ureq;
1487         struct object_update            *update;
1488         struct object_update_reply      *reply;
1489         int                              bufsize;
1490         int                              count;
1491         int                              current_batchid = -1;
1492         int                              i;
1493         int                              rc = 0;
1494         int                              rc1 = 0;
1495
1496         ENTRY;
1497
1498         req_capsule_set(pill, &RQF_OUT_UPDATE);
1499         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1500         if (ureq == NULL) {
1501                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1502                        -EPROTO);
1503                 RETURN(err_serious(-EPROTO));
1504         }
1505
1506         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1507         if (bufsize != object_update_request_size(ureq)) {
1508                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1509                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1510                 RETURN(err_serious(-EPROTO));
1511         }
1512
1513         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1514                 CERROR("%s: invalid update buffer magic %x expect %x: "
1515                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1516                        UPDATE_REQUEST_MAGIC, -EPROTO);
1517                 RETURN(err_serious(-EPROTO));
1518         }
1519
1520         count = ureq->ourq_count;
1521         if (count <= 0) {
1522                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1523                        -EPROTO);
1524                 RETURN(err_serious(-EPROTO));
1525         }
1526
1527         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1528                              OUT_UPDATE_REPLY_SIZE);
1529         rc = req_capsule_server_pack(pill);
1530         if (rc != 0) {
1531                 CERROR("%s: Can't pack response: rc = %d\n",
1532                        tgt_name(tsi->tsi_tgt), rc);
1533                 RETURN(rc);
1534         }
1535
1536         /* Prepare the update reply buffer */
1537         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1538         if (reply == NULL)
1539                 RETURN(err_serious(-EPROTO));
1540         object_update_reply_init(reply, count);
1541         tti->tti_u.update.tti_update_reply = reply;
1542         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1543
1544         /* Walk through updates in the request to execute them synchronously */
1545         for (i = 0; i < count; i++) {
1546                 struct tgt_handler      *h;
1547                 struct dt_object        *dt_obj;
1548
1549                 update = object_update_request_get(ureq, i, NULL);
1550                 if (update == NULL)
1551                         GOTO(out, rc = -EPROTO);
1552
1553                 if (ptlrpc_req_need_swab(pill->rc_req))
1554                         lustre_swab_object_update(update);
1555
1556                 if (!fid_is_sane(&update->ou_fid)) {
1557                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1558                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1559                                -EPROTO);
1560                         GOTO(out, rc = err_serious(-EPROTO));
1561                 }
1562
1563                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1564                 if (IS_ERR(dt_obj))
1565                         GOTO(out, rc = PTR_ERR(dt_obj));
1566
1567                 if (dt->dd_record_fid_accessed) {
1568                         lfsck_pack_rfa(&tti->tti_lr,
1569                                        lu_object_fid(&dt_obj->do_lu));
1570                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1571                 }
1572
1573                 tti->tti_u.update.tti_dt_object = dt_obj;
1574                 tti->tti_u.update.tti_update = update;
1575                 tti->tti_u.update.tti_update_reply_index = i;
1576
1577                 h = out_handler_find(update->ou_type);
1578                 if (unlikely(h == NULL)) {
1579                         CERROR("%s: unsupported opc: 0x%x\n",
1580                                tgt_name(tsi->tsi_tgt), update->ou_type);
1581                         GOTO(next, rc = -ENOTSUPP);
1582                 }
1583
1584                 /* Check resend case only for modifying RPC */
1585                 if (h->th_flags & MUTABOR) {
1586                         struct ptlrpc_request *req = tgt_ses_req(tsi);
1587
1588                         if (out_check_resent(env, dt, dt_obj, req,
1589                                              out_reconstruct, reply, i))
1590                                 GOTO(next, rc = 0);
1591                 }
1592
1593                 /* start transaction for modification RPC only */
1594                 if (h->th_flags & MUTABOR && current_batchid == -1) {
1595                         current_batchid = update->ou_batchid;
1596                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1597                         if (rc != 0)
1598                                 GOTO(next, rc);
1599                 }
1600
1601                 /* Stop the current update transaction, if the update has
1602                  * different batchid, or read-only update */
1603                 if (((current_batchid != update->ou_batchid) ||
1604                      !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
1605                         rc = out_tx_end(env, ta, rc);
1606                         current_batchid = -1;
1607                         if (rc != 0)
1608                                 GOTO(next, rc);
1609
1610                         /* start a new transaction if needed */
1611                         if (h->th_flags & MUTABOR) {
1612                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1613                                 if (rc != 0)
1614                                         GOTO(next, rc);
1615
1616                                 current_batchid = update->ou_batchid;
1617                         }
1618                 }
1619
1620                 rc = h->th_act(tsi);
1621 next:
1622                 lu_object_put(env, &dt_obj->do_lu);
1623                 if (rc < 0)
1624                         GOTO(out, rc);
1625         }
1626 out:
1627         if (current_batchid != -1) {
1628                 rc1 = out_tx_end(env, ta, rc);
1629                 if (rc == 0)
1630                         rc = rc1;
1631         }
1632
1633         RETURN(rc);
1634 }
1635
1636 struct tgt_handler tgt_out_handlers[] = {
1637 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1638 };
1639 EXPORT_SYMBOL(tgt_out_handlers);
1640