Whamcloud - gitweb
LU-5417 lustre: fix comparison between signed and unsigned
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta)
40 {
41         struct tx_arg   **new_ta;
42         int             i;
43         int             rc = 0;
44
45         if (ta->ta_alloc_args >= new_alloc_ta)
46                 return 0;
47
48         OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta);
49         if (new_ta == NULL)
50                 return -ENOMEM;
51
52         for (i = 0; i < new_alloc_ta; i++) {
53                 if (i < ta->ta_alloc_args) {
54                         /* copy the old args to new one */
55                         new_ta[i] = ta->ta_args[i];
56                 } else {
57                         OBD_ALLOC_PTR(new_ta[i]);
58                         if (new_ta[i] == NULL)
59                                 GOTO(out, rc = -ENOMEM);
60                 }
61         }
62
63         /* free the old args */
64         if (ta->ta_args != NULL)
65                 OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) *
66                                       ta->ta_alloc_args);
67
68         ta->ta_args = new_ta;
69         ta->ta_alloc_args = new_alloc_ta;
70 out:
71         if (rc != 0) {
72                 for (i = 0; i < new_alloc_ta; i++) {
73                         if (new_ta[i] != NULL)
74                                 OBD_FREE_PTR(new_ta[i]);
75                 }
76                 OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta);
77         }
78         return rc;
79 }
80
81 #define TX_ALLOC_STEP   8
82 static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta,
83                                   tx_exec_func_t func, tx_exec_func_t undo,
84                                   const char *file, int line)
85 {
86         int rc;
87         int i;
88
89         LASSERT(ta != NULL);
90         LASSERT(func != NULL);
91
92         if (ta->ta_argno + 1 >= ta->ta_alloc_args) {
93                 rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP);
94                 if (rc != 0)
95                         return ERR_PTR(rc);
96         }
97
98         i = ta->ta_argno;
99
100         ta->ta_argno++;
101
102         ta->ta_args[i]->exec_fn = func;
103         ta->ta_args[i]->undo_fn = undo;
104         ta->ta_args[i]->file    = file;
105         ta->ta_args[i]->line    = line;
106
107         return ta->ta_args[i];
108 }
109
110 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
111                             struct dt_object *obj,
112                             struct object_update_reply *reply,
113                             int index)
114 {
115         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
116                dt_obd_name(dt), reply, index, 0);
117
118         object_update_result_insert(reply, NULL, 0, index, 0);
119         return;
120 }
121
122 typedef void (*out_reconstruct_t)(const struct lu_env *env,
123                                   struct dt_device *dt,
124                                   struct dt_object *obj,
125                                   struct object_update_reply *reply,
126                                   int index);
127
128 static inline int out_check_resent(const struct lu_env *env,
129                                    struct dt_device *dt,
130                                    struct dt_object *obj,
131                                    struct ptlrpc_request *req,
132                                    out_reconstruct_t reconstruct,
133                                    struct object_update_reply *reply,
134                                    int index)
135 {
136         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
137                 return 0;
138
139         if (req_xid_is_last(req)) {
140                 reconstruct(env, dt, obj, reply, index);
141                 return 1;
142         }
143         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
144                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
145         return 0;
146 }
147
148 static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj,
149                            struct thandle *th)
150 {
151         int rc;
152
153         CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev),
154                PFID(lu_object_fid(&dt_obj->do_lu)));
155
156         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
157         rc = dt_destroy(env, dt_obj, th);
158         dt_write_unlock(env, dt_obj);
159
160         return rc;
161 }
162
163 /**
164  * All of the xxx_undo will be used once execution failed,
165  * But because all of the required resource has been reserved in
166  * declare phase, i.e. if declare succeed, it should make sure
167  * the following executing phase succeed in anyway, so these undo
168  * should be useless for most of the time in Phase I
169  */
170 int out_tx_create_undo(const struct lu_env *env, struct thandle *th,
171                        struct tx_arg *arg)
172 {
173         int rc;
174
175         rc = out_obj_destroy(env, arg->object, th);
176         if (rc != 0)
177                 CERROR("%s: undo failure, we are doomed!: rc = %d\n",
178                        dt_obd_name(th->th_dev), rc);
179         return rc;
180 }
181
182 int out_tx_create_exec(const struct lu_env *env, struct thandle *th,
183                        struct tx_arg *arg)
184 {
185         struct dt_object        *dt_obj = arg->object;
186         int                      rc;
187
188         CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n",
189                dt_obd_name(th->th_dev),
190                PFID(lu_object_fid(&arg->object->do_lu)),
191                arg->u.create.dof.dof_type,
192                arg->u.create.attr.la_mode & S_IFMT);
193
194         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
195         rc = dt_create(env, dt_obj, &arg->u.create.attr,
196                        &arg->u.create.hint, &arg->u.create.dof, th);
197
198         dt_write_unlock(env, dt_obj);
199
200         CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n",
201                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
202
203         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
204
205         return rc;
206 }
207
208 static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
209                            struct lu_attr *attr, struct lu_fid *parent_fid,
210                            struct dt_object_format *dof,
211                            struct thandle_exec_args *ta,
212                            struct object_update_reply *reply,
213                            int index, const char *file, int line)
214 {
215         struct tx_arg *arg;
216         int rc;
217
218         LASSERT(ta->ta_handle != NULL);
219         rc = dt_declare_create(env, obj, attr, NULL, dof,
220                                        ta->ta_handle);
221         if (rc != 0)
222                 return rc;
223
224         arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file,
225                           line);
226         if (IS_ERR(arg))
227                 return PTR_ERR(arg);
228
229         /* release the object in out_trans_stop */
230         lu_object_get(&obj->do_lu);
231         arg->object = obj;
232         arg->u.create.attr = *attr;
233         if (parent_fid != NULL)
234                 arg->u.create.fid = *parent_fid;
235         memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
236         arg->u.create.dof  = *dof;
237         arg->reply = reply;
238         arg->index = index;
239
240         return 0;
241 }
242
243 static int out_create(struct tgt_session_info *tsi)
244 {
245         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
246         struct object_update    *update = tti->tti_u.update.tti_update;
247         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
248         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
249         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
250         struct lu_attr          *attr = &tti->tti_attr;
251         struct lu_fid           *fid = NULL;
252         struct obdo             *wobdo;
253         size_t                  size;
254         int                     rc;
255
256         ENTRY;
257
258         wobdo = object_update_param_get(update, 0, &size);
259         if (wobdo == NULL || size != sizeof(*wobdo)) {
260                 CERROR("%s: obdo is NULL, invalid RPC: rc = %d\n",
261                        tgt_name(tsi->tsi_tgt), -EPROTO);
262                 RETURN(err_serious(-EPROTO));
263         }
264
265         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
266                 lustre_swab_obdo(wobdo);
267         lustre_get_wire_obdo(NULL, lobdo, wobdo);
268         la_from_obdo(attr, lobdo, lobdo->o_valid);
269
270         dof->dof_type = dt_mode_to_dft(attr->la_mode);
271         if (update->ou_params_count > 1) {
272                 fid = object_update_param_get(update, 1, &size);
273                 if (fid == NULL || size != sizeof(*fid)) {
274                         CERROR("%s: invalid fid: rc = %d\n",
275                                tgt_name(tsi->tsi_tgt), -EPROTO);
276                         RETURN(err_serious(-EPROTO));
277                 }
278                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
279                         lustre_swab_lu_fid(fid);
280                 if (!fid_is_sane(fid)) {
281                         CERROR("%s: invalid fid "DFID": rc = %d\n",
282                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
283                         RETURN(err_serious(-EPROTO));
284                 }
285         }
286
287         if (lu_object_exists(&obj->do_lu))
288                 RETURN(-EEXIST);
289
290         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
291                            &tti->tti_tea,
292                            tti->tti_u.update.tti_update_reply,
293                            tti->tti_u.update.tti_update_reply_index);
294
295         RETURN(rc);
296 }
297
298 static int out_tx_attr_set_undo(const struct lu_env *env,
299                                 struct thandle *th, struct tx_arg *arg)
300 {
301         CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n",
302                dt_obd_name(th->th_dev),
303                PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP);
304
305         return -ENOTSUPP;
306 }
307
308 static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th,
309                                 struct tx_arg *arg)
310 {
311         struct dt_object        *dt_obj = arg->object;
312         int                     rc;
313
314         CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev),
315                PFID(lu_object_fid(&dt_obj->do_lu)));
316
317         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
318         rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th, NULL);
319         dt_write_unlock(env, dt_obj);
320
321         CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n",
322                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
323
324         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
325
326         return rc;
327 }
328
329 static int __out_tx_attr_set(const struct lu_env *env,
330                              struct dt_object *dt_obj,
331                              const struct lu_attr *attr,
332                              struct thandle_exec_args *th,
333                              struct object_update_reply *reply,
334                              int index, const char *file, int line)
335 {
336         struct tx_arg   *arg;
337         int             rc;
338
339         LASSERT(th->ta_handle != NULL);
340         rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle);
341         if (rc != 0)
342                 return rc;
343
344         arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo,
345                           file, line);
346         if (IS_ERR(arg))
347                 return PTR_ERR(arg);
348
349         lu_object_get(&dt_obj->do_lu);
350         arg->object = dt_obj;
351         arg->u.attr_set.attr = *attr;
352         arg->reply = reply;
353         arg->index = index;
354         return 0;
355 }
356
357 static int out_attr_set(struct tgt_session_info *tsi)
358 {
359         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
360         struct object_update    *update = tti->tti_u.update.tti_update;
361         struct lu_attr          *attr = &tti->tti_attr;
362         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
363         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
364         struct obdo             *wobdo;
365         size_t                   size;
366         int                      rc;
367
368         ENTRY;
369
370         wobdo = object_update_param_get(update, 0, &size);
371         if (wobdo == NULL || size != sizeof(*wobdo)) {
372                 CERROR("%s: empty obdo in the update: rc = %d\n",
373                        tgt_name(tsi->tsi_tgt), -EPROTO);
374                 RETURN(err_serious(-EPROTO));
375         }
376
377         attr->la_valid = 0;
378         attr->la_valid = 0;
379
380         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
381                 lustre_swab_obdo(wobdo);
382         lustre_get_wire_obdo(NULL, lobdo, wobdo);
383         la_from_obdo(attr, lobdo, lobdo->o_valid);
384
385         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
386                              tti->tti_u.update.tti_update_reply,
387                              tti->tti_u.update.tti_update_reply_index);
388
389         RETURN(rc);
390 }
391
392 static int out_attr_get(struct tgt_session_info *tsi)
393 {
394         const struct lu_env     *env = tsi->tsi_env;
395         struct tgt_thread_info  *tti = tgt_th_info(env);
396         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
397         struct lu_attr          *la = &tti->tti_attr;
398         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
399         int                     idx = tti->tti_u.update.tti_update_reply_index;
400         int                     rc;
401
402         ENTRY;
403
404         if (!lu_object_exists(&obj->do_lu)) {
405                 /* Usually, this will be called when the master MDT try
406                  * to init a remote object(see osp_object_init), so if
407                  * the object does not exist on slave, we need set BANSHEE flag,
408                  * so the object can be removed from the cache immediately */
409                 set_bit(LU_OBJECT_HEARD_BANSHEE,
410                         &obj->do_lu.lo_header->loh_flags);
411                 RETURN(-ENOENT);
412         }
413
414         dt_read_lock(env, obj, MOR_TGT_CHILD);
415         rc = dt_attr_get(env, obj, la, NULL);
416         if (rc)
417                 GOTO(out_unlock, rc);
418
419         obdo->o_valid = 0;
420         obdo_from_la(obdo, la, la->la_valid);
421         lustre_set_wire_obdo(NULL, obdo, obdo);
422
423 out_unlock:
424         dt_read_unlock(env, obj);
425
426         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
427                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
428                0, rc);
429
430         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
431                                     sizeof(*obdo), idx, rc);
432
433         RETURN(rc);
434 }
435
436 static int out_xattr_get(struct tgt_session_info *tsi)
437 {
438         const struct lu_env        *env = tsi->tsi_env;
439         struct tgt_thread_info     *tti = tgt_th_info(env);
440         struct object_update       *update = tti->tti_u.update.tti_update;
441         struct lu_buf              *lbuf = &tti->tti_buf;
442         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
443         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
444         char                       *name;
445         struct object_update_result *update_result;
446         int                     idx = tti->tti_u.update.tti_update_reply_index;
447         int                        rc;
448
449         ENTRY;
450
451         if (!lu_object_exists(&obj->do_lu)) {
452                 set_bit(LU_OBJECT_HEARD_BANSHEE,
453                         &obj->do_lu.lo_header->loh_flags);
454                 RETURN(-ENOENT);
455         }
456
457         name = object_update_param_get(update, 0, NULL);
458         if (name == NULL) {
459                 CERROR("%s: empty name for xattr get: rc = %d\n",
460                        tgt_name(tsi->tsi_tgt), -EPROTO);
461                 RETURN(err_serious(-EPROTO));
462         }
463
464         update_result = object_update_result_get(reply, 0, NULL);
465         if (update_result == NULL) {
466                 CERROR("%s: empty name for xattr get: rc = %d\n",
467                        tgt_name(tsi->tsi_tgt), -EPROTO);
468                 RETURN(err_serious(-EPROTO));
469         }
470
471         lbuf->lb_buf = update_result->our_data;
472         lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
473                        cfs_size_round((unsigned long)update_result->our_data -
474                                       (unsigned long)update_result);
475         dt_read_lock(env, obj, MOR_TGT_CHILD);
476         rc = dt_xattr_get(env, obj, lbuf, name, NULL);
477         dt_read_unlock(env, obj);
478         if (rc < 0) {
479                 lbuf->lb_len = 0;
480                 GOTO(out, rc);
481         }
482         if (rc == 0) {
483                 lbuf->lb_len = 0;
484                 GOTO(out, rc = -ENOENT);
485         }
486         lbuf->lb_len = rc;
487         rc = 0;
488         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
489                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
490                name, (int)lbuf->lb_len);
491
492         GOTO(out, rc);
493
494 out:
495         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
496         RETURN(rc);
497 }
498
499 static int out_index_lookup(struct tgt_session_info *tsi)
500 {
501         const struct lu_env     *env = tsi->tsi_env;
502         struct tgt_thread_info  *tti = tgt_th_info(env);
503         struct object_update    *update = tti->tti_u.update.tti_update;
504         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
505         char                    *name;
506         int                      rc;
507
508         ENTRY;
509
510         if (!lu_object_exists(&obj->do_lu))
511                 RETURN(-ENOENT);
512
513         name = object_update_param_get(update, 0, NULL);
514         if (name == NULL) {
515                 CERROR("%s: empty name for lookup: rc = %d\n",
516                        tgt_name(tsi->tsi_tgt), -EPROTO);
517                 RETURN(err_serious(-EPROTO));
518         }
519
520         dt_read_lock(env, obj, MOR_TGT_CHILD);
521         if (!dt_try_as_dir(env, obj))
522                 GOTO(out_unlock, rc = -ENOTDIR);
523
524         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
525                 (struct dt_key *)name, NULL);
526
527         if (rc < 0)
528                 GOTO(out_unlock, rc);
529
530         if (rc == 0)
531                 rc += 1;
532
533 out_unlock:
534         dt_read_unlock(env, obj);
535
536         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
537                PFID(lu_object_fid(&obj->do_lu)), name,
538                PFID(&tti->tti_fid1), rc);
539
540         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
541                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
542                0, rc);
543
544         object_update_result_insert(tti->tti_u.update.tti_update_reply,
545                             &tti->tti_fid1, sizeof(tti->tti_fid1),
546                             tti->tti_u.update.tti_update_reply_index, rc);
547         RETURN(rc);
548 }
549
550 static int out_tx_xattr_set_exec(const struct lu_env *env,
551                                  struct thandle *th,
552                                  struct tx_arg *arg)
553 {
554         struct dt_object *dt_obj = arg->object;
555         int rc;
556
557         CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
558                dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
559                arg->u.xattr_set.name, arg->u.xattr_set.flags);
560
561         if (!lu_object_exists(&dt_obj->do_lu))
562                 GOTO(out, rc = -ENOENT);
563
564         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
565         rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
566                           arg->u.xattr_set.name, arg->u.xattr_set.flags,
567                           th, NULL);
568         dt_write_unlock(env, dt_obj);
569         /**
570          * Ignore errors if this is LINK EA
571          **/
572         if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
573                 rc = 0;
574 out:
575         CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
576                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
577
578         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
579
580         return rc;
581 }
582
583 static int __out_tx_xattr_set(const struct lu_env *env,
584                               struct dt_object *dt_obj,
585                               const struct lu_buf *buf,
586                               const char *name, int flags,
587                               struct thandle_exec_args *ta,
588                               struct object_update_reply *reply,
589                               int index, const char *file, int line)
590 {
591         struct tx_arg   *arg;
592         int             rc;
593
594         LASSERT(ta->ta_handle != NULL);
595         rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle);
596         if (rc != 0)
597                 return rc;
598
599         arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
600         if (IS_ERR(arg))
601                 return PTR_ERR(arg);
602
603         lu_object_get(&dt_obj->do_lu);
604         arg->object = dt_obj;
605         arg->u.xattr_set.name = name;
606         arg->u.xattr_set.flags = flags;
607         arg->u.xattr_set.buf = *buf;
608         arg->reply = reply;
609         arg->index = index;
610         arg->u.xattr_set.csum = 0;
611         return 0;
612 }
613
614 static int out_xattr_set(struct tgt_session_info *tsi)
615 {
616         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
617         struct object_update    *update = tti->tti_u.update.tti_update;
618         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
619         struct lu_buf           *lbuf = &tti->tti_buf;
620         char                    *name;
621         char                    *buf;
622         __u32                   *tmp;
623         size_t                   buf_len = 0;
624         int                      flag;
625         size_t                   size = 0;
626         int                      rc;
627         ENTRY;
628
629         name = object_update_param_get(update, 0, NULL);
630         if (name == NULL) {
631                 CERROR("%s: empty name for xattr set: rc = %d\n",
632                        tgt_name(tsi->tsi_tgt), -EPROTO);
633                 RETURN(err_serious(-EPROTO));
634         }
635
636         buf = object_update_param_get(update, 1, &buf_len);
637         if (buf == NULL || buf_len == 0) {
638                 CERROR("%s: empty buf for xattr set: rc = %d\n",
639                        tgt_name(tsi->tsi_tgt), -EPROTO);
640                 RETURN(err_serious(-EPROTO));
641         }
642
643         lbuf->lb_buf = buf;
644         lbuf->lb_len = buf_len;
645
646         tmp = object_update_param_get(update, 2, &size);
647         if (tmp == NULL || size != sizeof(*tmp)) {
648                 CERROR("%s: emptry or wrong size %zd flag: rc = %d\n",
649                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
650                 RETURN(err_serious(-EPROTO));
651         }
652
653         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
654                 __swab32s(tmp);
655         flag = *tmp;
656
657         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
658                               &tti->tti_tea,
659                               tti->tti_u.update.tti_update_reply,
660                               tti->tti_u.update.tti_update_reply_index);
661         RETURN(rc);
662 }
663
664 static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th,
665                                  struct tx_arg *arg)
666 {
667         struct dt_object *dt_obj = arg->object;
668         int rc;
669
670         CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n",
671                dt_obd_name(th->th_dev), arg->u.xattr_set.name,
672                PFID(lu_object_fid(&dt_obj->do_lu)));
673
674         if (!lu_object_exists(&dt_obj->do_lu))
675                 GOTO(out, rc = -ENOENT);
676
677         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
678         rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name,
679                           th, NULL);
680         dt_write_unlock(env, dt_obj);
681 out:
682         CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n",
683                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
684
685         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
686
687         return rc;
688 }
689
690 static int __out_tx_xattr_del(const struct lu_env *env,
691                               struct dt_object *dt_obj, const char *name,
692                               struct thandle_exec_args *ta,
693                               struct object_update_reply *reply,
694                               int index, const char *file, int line)
695 {
696         struct tx_arg   *arg;
697         int             rc;
698
699         rc = dt_declare_xattr_del(env, dt_obj, name, ta->ta_handle);
700         if (rc != 0)
701                 return rc;
702
703         arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line);
704         if (IS_ERR(arg))
705                 return PTR_ERR(arg);
706
707         lu_object_get(&dt_obj->do_lu);
708         arg->object = dt_obj;
709         arg->u.xattr_set.name = name;
710         arg->reply = reply;
711         arg->index = index;
712         return 0;
713 }
714
715 static int out_xattr_del(struct tgt_session_info *tsi)
716 {
717         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
718         struct object_update    *update = tti->tti_u.update.tti_update;
719         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
720         char                    *name;
721         int                      rc;
722         ENTRY;
723
724         name = object_update_param_get(update, 0, NULL);
725         if (name == NULL) {
726                 CERROR("%s: empty name for xattr set: rc = %d\n",
727                        tgt_name(tsi->tsi_tgt), -EPROTO);
728                 RETURN(err_serious(-EPROTO));
729         }
730
731         rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea,
732                               tti->tti_u.update.tti_update_reply,
733                               tti->tti_u.update.tti_update_reply_index);
734         RETURN(rc);
735 }
736
737 static int out_obj_ref_add(const struct lu_env *env,
738                            struct dt_object *dt_obj,
739                            struct thandle *th)
740 {
741         int rc;
742
743         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
744         rc = dt_ref_add(env, dt_obj, th);
745         dt_write_unlock(env, dt_obj);
746
747         return rc;
748 }
749
750 static int out_obj_ref_del(const struct lu_env *env,
751                            struct dt_object *dt_obj,
752                            struct thandle *th)
753 {
754         int rc;
755
756         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
757         rc = dt_ref_del(env, dt_obj, th);
758         dt_write_unlock(env, dt_obj);
759
760         return rc;
761 }
762
763 static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th,
764                                struct tx_arg *arg)
765 {
766         struct dt_object *dt_obj = arg->object;
767         int rc;
768
769         rc = out_obj_ref_add(env, dt_obj, th);
770
771         CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n",
772                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
773
774         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
775         return rc;
776 }
777
778 static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th,
779                                struct tx_arg *arg)
780 {
781         return out_obj_ref_del(env, arg->object, th);
782 }
783
784 static int __out_tx_ref_add(const struct lu_env *env,
785                             struct dt_object *dt_obj,
786                             struct thandle_exec_args *ta,
787                             struct object_update_reply *reply,
788                             int index, const char *file, int line)
789 {
790         struct tx_arg   *arg;
791         int             rc;
792
793         LASSERT(ta->ta_handle != NULL);
794         rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle);
795         if (rc != 0)
796                 return rc;
797
798         arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file,
799                           line);
800         if (IS_ERR(arg))
801                 return PTR_ERR(arg);
802
803         lu_object_get(&dt_obj->do_lu);
804         arg->object = dt_obj;
805         arg->reply = reply;
806         arg->index = index;
807         return 0;
808 }
809
810 /**
811  * increase ref of the object
812  **/
813 static int out_ref_add(struct tgt_session_info *tsi)
814 {
815         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
816         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
817         int                      rc;
818
819         ENTRY;
820
821         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
822                             tti->tti_u.update.tti_update_reply,
823                             tti->tti_u.update.tti_update_reply_index);
824         RETURN(rc);
825 }
826
827 static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th,
828                                struct tx_arg *arg)
829 {
830         struct dt_object        *dt_obj = arg->object;
831         int                      rc;
832
833         rc = out_obj_ref_del(env, dt_obj, th);
834
835         CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n",
836                dt_obd_name(th->th_dev), arg->reply, arg->index, 0);
837
838         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
839
840         return rc;
841 }
842
843 static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th,
844                                struct tx_arg *arg)
845 {
846         return out_obj_ref_add(env, arg->object, th);
847 }
848
849 static int __out_tx_ref_del(const struct lu_env *env,
850                             struct dt_object *dt_obj,
851                             struct thandle_exec_args *ta,
852                             struct object_update_reply *reply,
853                             int index, const char *file, int line)
854 {
855         struct tx_arg   *arg;
856         int             rc;
857
858         LASSERT(ta->ta_handle != NULL);
859         rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle);
860         if (rc != 0)
861                 return rc;
862
863         arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file,
864                           line);
865         if (IS_ERR(arg))
866                 return PTR_ERR(arg);
867
868         lu_object_get(&dt_obj->do_lu);
869         arg->object = dt_obj;
870         arg->reply = reply;
871         arg->index = index;
872         return 0;
873 }
874
875 static int out_ref_del(struct tgt_session_info *tsi)
876 {
877         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
878         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
879         int                      rc;
880
881         ENTRY;
882
883         if (!lu_object_exists(&obj->do_lu))
884                 RETURN(-ENOENT);
885
886         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
887                             tti->tti_u.update.tti_update_reply,
888                             tti->tti_u.update.tti_update_reply_index);
889         RETURN(rc);
890 }
891
892 static int out_obj_index_insert(const struct lu_env *env,
893                                 struct dt_object *dt_obj,
894                                 const struct dt_rec *rec,
895                                 const struct dt_key *key,
896                                 struct thandle *th)
897 {
898         int rc;
899
900         CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n",
901                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
902                (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid),
903                ((struct dt_insert_rec *)rec)->rec_type);
904
905         if (dt_try_as_dir(env, dt_obj) == 0)
906                 return -ENOTDIR;
907
908         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
909         rc = dt_insert(env, dt_obj, rec, key, th, NULL, 0);
910         dt_write_unlock(env, dt_obj);
911
912         return rc;
913 }
914
915 static int out_obj_index_delete(const struct lu_env *env,
916                                 struct dt_object *dt_obj,
917                                 const struct dt_key *key,
918                                 struct thandle *th)
919 {
920         int rc;
921
922         CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n",
923                dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)),
924                (char *)key);
925
926         if (dt_try_as_dir(env, dt_obj) == 0)
927                 return -ENOTDIR;
928
929         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
930         rc = dt_delete(env, dt_obj, key, th, NULL);
931         dt_write_unlock(env, dt_obj);
932
933         return rc;
934 }
935
936 static int out_tx_index_insert_exec(const struct lu_env *env,
937                                     struct thandle *th, struct tx_arg *arg)
938 {
939         struct dt_object *dt_obj = arg->object;
940         int rc;
941
942         rc = out_obj_index_insert(env, dt_obj,
943                                   (const struct dt_rec *)&arg->u.insert.rec,
944                                   arg->u.insert.key, th);
945
946         CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n",
947                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
948
949         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
950
951         return rc;
952 }
953
954 static int out_tx_index_insert_undo(const struct lu_env *env,
955                                     struct thandle *th, struct tx_arg *arg)
956 {
957         return out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
958 }
959
960 static int __out_tx_index_insert(const struct lu_env *env,
961                                  struct dt_object *dt_obj,
962                                  const struct dt_rec *rec,
963                                  const struct dt_key *key,
964                                  struct thandle_exec_args *ta,
965                                  struct object_update_reply *reply,
966                                  int index, const char *file, int line)
967 {
968         struct tx_arg   *arg;
969         int             rc;
970
971         LASSERT(ta->ta_handle != NULL);
972         if (dt_try_as_dir(env, dt_obj) == 0) {
973                 rc = -ENOTDIR;
974                 return rc;
975         }
976
977         rc = dt_declare_insert(env, dt_obj, rec, key, ta->ta_handle);
978         if (rc != 0)
979                 return rc;
980
981         arg = tx_add_exec(ta, out_tx_index_insert_exec,
982                           out_tx_index_insert_undo, file, line);
983         if (IS_ERR(arg))
984                 return PTR_ERR(arg);
985
986         lu_object_get(&dt_obj->do_lu);
987         arg->object = dt_obj;
988         arg->reply = reply;
989         arg->index = index;
990         arg->u.insert.rec = *(const struct dt_insert_rec *)rec;
991         arg->u.insert.key = key;
992
993         return 0;
994 }
995
996 static int out_index_insert(struct tgt_session_info *tsi)
997 {
998         struct tgt_thread_info  *tti    = tgt_th_info(tsi->tsi_env);
999         struct object_update    *update = tti->tti_u.update.tti_update;
1000         struct dt_object        *obj    = tti->tti_u.update.tti_dt_object;
1001         struct dt_insert_rec    *rec    = &tti->tti_rec;
1002         struct lu_fid           *fid;
1003         char                    *name;
1004         __u32                   *ptype;
1005         int                      rc     = 0;
1006         size_t                   size;
1007         ENTRY;
1008
1009         name = object_update_param_get(update, 0, NULL);
1010         if (name == NULL) {
1011                 CERROR("%s: empty name for index insert: rc = %d\n",
1012                        tgt_name(tsi->tsi_tgt), -EPROTO);
1013                 RETURN(err_serious(-EPROTO));
1014         }
1015
1016         fid = object_update_param_get(update, 1, &size);
1017         if (fid == NULL || size != sizeof(*fid)) {
1018                 CERROR("%s: invalid fid: rc = %d\n",
1019                        tgt_name(tsi->tsi_tgt), -EPROTO);
1020                        RETURN(err_serious(-EPROTO));
1021         }
1022
1023         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1024                 lustre_swab_lu_fid(fid);
1025
1026         if (!fid_is_sane(fid)) {
1027                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1028                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1029                 RETURN(err_serious(-EPROTO));
1030         }
1031
1032         ptype = object_update_param_get(update, 2, &size);
1033         if (ptype == NULL || size != sizeof(*ptype)) {
1034                 CERROR("%s: invalid type for index insert: rc = %d\n",
1035                        tgt_name(tsi->tsi_tgt), -EPROTO);
1036                 RETURN(err_serious(-EPROTO));
1037         }
1038
1039         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1040                 __swab32s(ptype);
1041
1042         rec->rec_fid = fid;
1043         rec->rec_type = *ptype;
1044
1045         rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec,
1046                                  (const struct dt_key *)name, &tti->tti_tea,
1047                                  tti->tti_u.update.tti_update_reply,
1048                                  tti->tti_u.update.tti_update_reply_index);
1049         RETURN(rc);
1050 }
1051
1052 static int out_tx_index_delete_exec(const struct lu_env *env,
1053                                     struct thandle *th,
1054                                     struct tx_arg *arg)
1055 {
1056         int rc;
1057
1058         rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th);
1059
1060         CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n",
1061                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1062
1063         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1064
1065         return rc;
1066 }
1067
1068 static int out_tx_index_delete_undo(const struct lu_env *env,
1069                                     struct thandle *th,
1070                                     struct tx_arg *arg)
1071 {
1072         CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n",
1073                dt_obd_name(th->th_dev), -ENOTSUPP);
1074         return -ENOTSUPP;
1075 }
1076
1077 static int __out_tx_index_delete(const struct lu_env *env,
1078                                  struct dt_object *dt_obj,
1079                                  const struct dt_key *key,
1080                                  struct thandle_exec_args *ta,
1081                                  struct object_update_reply *reply,
1082                                  int index, const char *file, int line)
1083 {
1084         struct tx_arg   *arg;
1085         int             rc;
1086
1087         if (dt_try_as_dir(env, dt_obj) == 0) {
1088                 rc = -ENOTDIR;
1089                 return rc;
1090         }
1091
1092         LASSERT(ta->ta_handle != NULL);
1093         rc = dt_declare_delete(env, dt_obj, key, ta->ta_handle);
1094         if (rc != 0)
1095                 return rc;
1096
1097         arg = tx_add_exec(ta, out_tx_index_delete_exec,
1098                           out_tx_index_delete_undo, file, line);
1099         if (IS_ERR(arg))
1100                 return PTR_ERR(arg);
1101
1102         lu_object_get(&dt_obj->do_lu);
1103         arg->object = dt_obj;
1104         arg->reply = reply;
1105         arg->index = index;
1106         arg->u.insert.key = key;
1107         return 0;
1108 }
1109
1110 static int out_index_delete(struct tgt_session_info *tsi)
1111 {
1112         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1113         struct object_update    *update = tti->tti_u.update.tti_update;
1114         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1115         char                    *name;
1116         int                      rc = 0;
1117
1118         if (!lu_object_exists(&obj->do_lu))
1119                 RETURN(-ENOENT);
1120
1121         name = object_update_param_get(update, 0, NULL);
1122         if (name == NULL) {
1123                 CERROR("%s: empty name for index delete: rc = %d\n",
1124                        tgt_name(tsi->tsi_tgt), -EPROTO);
1125                 RETURN(err_serious(-EPROTO));
1126         }
1127
1128         rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name,
1129                                  &tti->tti_tea,
1130                                  tti->tti_u.update.tti_update_reply,
1131                                  tti->tti_u.update.tti_update_reply_index);
1132         RETURN(rc);
1133 }
1134
1135 static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th,
1136                                struct tx_arg *arg)
1137 {
1138         struct dt_object *dt_obj = arg->object;
1139         int rc;
1140
1141         rc = out_obj_destroy(env, dt_obj, th);
1142
1143         CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n",
1144                dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
1145
1146         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1147
1148         RETURN(rc);
1149 }
1150
1151 static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th,
1152                                struct tx_arg *arg)
1153 {
1154         CERROR("%s: not support destroy undo yet!: rc = %d\n",
1155                dt_obd_name(th->th_dev), -ENOTSUPP);
1156         return -ENOTSUPP;
1157 }
1158
1159 static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj,
1160                              struct thandle_exec_args *ta,
1161                              struct object_update_reply *reply,
1162                              int index, const char *file, int line)
1163 {
1164         struct tx_arg   *arg;
1165         int             rc;
1166
1167         LASSERT(ta->ta_handle != NULL);
1168         rc = dt_declare_destroy(env, dt_obj, ta->ta_handle);
1169         if (rc != 0)
1170                 return rc;
1171
1172         arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo,
1173                           file, line);
1174         if (IS_ERR(arg))
1175                 return PTR_ERR(arg);
1176
1177         lu_object_get(&dt_obj->do_lu);
1178         arg->object = dt_obj;
1179         arg->reply = reply;
1180         arg->index = index;
1181         return 0;
1182 }
1183
1184 static int out_destroy(struct tgt_session_info *tsi)
1185 {
1186         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1187         struct object_update    *update = tti->tti_u.update.tti_update;
1188         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1189         struct lu_fid           *fid;
1190         int                      rc;
1191         ENTRY;
1192
1193         fid = &update->ou_fid;
1194         if (!fid_is_sane(fid)) {
1195                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1196                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
1197                 RETURN(err_serious(-EPROTO));
1198         }
1199
1200         if (!lu_object_exists(&obj->do_lu))
1201                 RETURN(-ENOENT);
1202
1203         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
1204                             tti->tti_u.update.tti_update_reply,
1205                             tti->tti_u.update.tti_update_reply_index);
1206
1207         RETURN(rc);
1208 }
1209
1210 static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
1211                              struct tx_arg *arg)
1212 {
1213         struct dt_object *dt_obj = arg->object;
1214         int rc;
1215
1216         dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
1217         rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
1218                              &arg->u.write.pos, th);
1219         dt_write_unlock(env, dt_obj);
1220
1221         if (rc == 0)
1222                 rc = arg->u.write.buf.lb_len;
1223
1224         object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
1225
1226         return rc > 0 ? 0 : rc;
1227 }
1228
1229 static int __out_tx_write(const struct lu_env *env,
1230                           struct dt_object *dt_obj,
1231                           const struct lu_buf *buf,
1232                           loff_t pos, struct thandle_exec_args *ta,
1233                           struct object_update_reply *reply,
1234                           int index, const char *file, int line)
1235 {
1236         struct tx_arg   *arg;
1237         int             rc;
1238
1239         LASSERT(ta->ta_handle != NULL);
1240         rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle);
1241         if (rc != 0)
1242                 return rc;
1243
1244         arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
1245         if (IS_ERR(arg))
1246                 return PTR_ERR(arg);
1247
1248         lu_object_get(&dt_obj->do_lu);
1249         arg->object = dt_obj;
1250         arg->u.write.buf = *buf;
1251         arg->u.write.pos = pos;
1252         arg->reply = reply;
1253         arg->index = index;
1254         return 0;
1255 }
1256
1257 static int out_write(struct tgt_session_info *tsi)
1258 {
1259         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1260         struct object_update    *update = tti->tti_u.update.tti_update;
1261         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
1262         struct lu_buf           *lbuf = &tti->tti_buf;
1263         char                    *buf;
1264         __u64                   *tmp;
1265         size_t                  size = 0;
1266         size_t                  buf_len = 0;
1267         loff_t                  pos;
1268         int                      rc;
1269         ENTRY;
1270
1271         buf = object_update_param_get(update, 0, &buf_len);
1272         if (buf == NULL || buf_len == 0) {
1273                 CERROR("%s: empty buf for xattr set: rc = %d\n",
1274                        tgt_name(tsi->tsi_tgt), -EPROTO);
1275                 RETURN(err_serious(-EPROTO));
1276         }
1277         lbuf->lb_buf = buf;
1278         lbuf->lb_len = buf_len;
1279
1280         tmp = object_update_param_get(update, 1, &size);
1281         if (tmp == NULL || size != sizeof(*tmp)) {
1282                 CERROR("%s: empty or wrong size %zd pos: rc = %d\n",
1283                        tgt_name(tsi->tsi_tgt), size, -EPROTO);
1284                 RETURN(err_serious(-EPROTO));
1285         }
1286
1287         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
1288                 __swab64s(tmp);
1289         pos = *tmp;
1290
1291         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
1292                           &tti->tti_tea,
1293                           tti->tti_u.update.tti_update_reply,
1294                           tti->tti_u.update.tti_update_reply_index);
1295         RETURN(rc);
1296 }
1297
1298 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
1299 [opc - OUT_CREATE] = {                                  \
1300         .th_name    = name,                             \
1301         .th_fail_id = 0,                                \
1302         .th_opc     = opc,                              \
1303         .th_flags   = flags,                            \
1304         .th_act     = fn,                               \
1305         .th_fmt     = NULL,                             \
1306         .th_version = 0,                                \
1307 }
1308
1309 static struct tgt_handler out_update_ops[] = {
1310         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
1311                      out_create),
1312         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
1313                      out_destroy),
1314         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
1315                      out_ref_add),
1316         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
1317                      out_ref_del),
1318         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
1319                      out_attr_set),
1320         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
1321                      out_attr_get),
1322         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
1323                      out_xattr_set),
1324         DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO,
1325                      out_xattr_del),
1326         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
1327                      out_xattr_get),
1328         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
1329                      out_index_lookup),
1330         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
1331                      MUTABOR | HABEO_REFERO, out_index_insert),
1332         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
1333                      MUTABOR | HABEO_REFERO, out_index_delete),
1334         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
1335 };
1336
1337 struct tgt_handler *out_handler_find(__u32 opc)
1338 {
1339         struct tgt_handler *h;
1340
1341         h = NULL;
1342         if (OUT_CREATE <= opc && opc < OUT_LAST) {
1343                 h = &out_update_ops[opc - OUT_CREATE];
1344                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
1345                          h->th_opc, opc);
1346         } else {
1347                 h = NULL; /* unsupported opc */
1348         }
1349         return h;
1350 }
1351
1352 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
1353                         struct thandle_exec_args *ta, struct obd_export *exp)
1354 {
1355         ta->ta_argno = 0;
1356         ta->ta_handle = dt_trans_create(env, dt);
1357         if (IS_ERR(ta->ta_handle)) {
1358                 int rc;
1359
1360                 rc = PTR_ERR(ta->ta_handle);
1361                 ta->ta_handle = NULL;
1362                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
1363                        rc);
1364                 return rc;
1365         }
1366         if (exp->exp_need_sync)
1367                 ta->ta_handle->th_sync = 1;
1368
1369         return 0;
1370 }
1371
1372 static int out_trans_start(const struct lu_env *env,
1373                            struct thandle_exec_args *ta)
1374 {
1375         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
1376 }
1377
1378 static int out_trans_stop(const struct lu_env *env,
1379                           struct thandle_exec_args *ta, int err)
1380 {
1381         int i;
1382         int rc;
1383
1384         ta->ta_handle->th_result = err;
1385         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
1386         for (i = 0; i < ta->ta_argno; i++) {
1387                 if (ta->ta_args[i]->object != NULL) {
1388                         struct dt_object *obj = ta->ta_args[i]->object;
1389
1390                         /* If the object is being created during this
1391                          * transaction, we need to remove them from the
1392                          * cache immediately, because a few layers are
1393                          * missing in OUT handler, i.e. the object might
1394                          * not be initialized in all layers */
1395                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
1396                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
1397                                         &obj->do_lu.lo_header->loh_flags);
1398                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1399                         ta->ta_args[i]->object = NULL;
1400                 }
1401         }
1402         ta->ta_handle = NULL;
1403         ta->ta_argno = 0;
1404
1405         return rc;
1406 }
1407
1408 int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
1409                int declare_ret)
1410 {
1411         struct tgt_session_info *tsi = tgt_ses_info(env);
1412         int                     i;
1413         int                     rc;
1414         int                     rc1;
1415         ENTRY;
1416
1417         if (ta->ta_handle == NULL)
1418                 RETURN(0);
1419
1420         if (declare_ret != 0 || ta->ta_argno == 0)
1421                 GOTO(stop, rc = declare_ret);
1422
1423         LASSERT(ta->ta_handle->th_dev != NULL);
1424         rc = out_trans_start(env, ta);
1425         if (unlikely(rc != 0))
1426                 GOTO(stop, rc);
1427
1428         for (i = 0; i < ta->ta_argno; i++) {
1429                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
1430                                              ta->ta_args[i]);
1431                 if (unlikely(rc != 0)) {
1432                         CDEBUG(D_INFO, "error during execution of #%u from"
1433                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1434                                ta->ta_args[i]->line, rc);
1435                         while (--i >= 0) {
1436                                 if (ta->ta_args[i]->undo_fn != NULL)
1437                                         ta->ta_args[i]->undo_fn(env,
1438                                                                ta->ta_handle,
1439                                                                ta->ta_args[i]);
1440                                 else
1441                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1442                                              dt_obd_name(ta->ta_handle->th_dev),
1443                                                ta->ta_args[i]->file,
1444                                                ta->ta_args[i]->line, -ENOTSUPP);
1445                         }
1446                         break;
1447                 }
1448                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
1449                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
1450         }
1451
1452         /* Only fail for real update */
1453         tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
1454 stop:
1455         rc1 = out_trans_stop(env, ta, rc);
1456         if (rc == 0)
1457                 rc = rc1;
1458
1459         ta->ta_handle = NULL;
1460         ta->ta_argno = 0;
1461
1462         RETURN(rc);
1463 }
1464
1465 /**
1466  * Object updates between Targets. Because all the updates has been
1467  * dis-assemblied into object updates at sender side, so OUT will
1468  * call OSD API directly to execute these updates.
1469  *
1470  * In DNE phase I all of the updates in the request need to be executed
1471  * in one transaction, and the transaction has to be synchronously.
1472  *
1473  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
1474  * format.
1475  */
1476 int out_handle(struct tgt_session_info *tsi)
1477 {
1478         const struct lu_env             *env = tsi->tsi_env;
1479         struct tgt_thread_info          *tti = tgt_th_info(env);
1480         struct thandle_exec_args        *ta = &tti->tti_tea;
1481         struct req_capsule              *pill = tsi->tsi_pill;
1482         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
1483         struct object_update_request    *ureq;
1484         struct object_update            *update;
1485         struct object_update_reply      *reply;
1486         int                              bufsize;
1487         int                              count;
1488         int                              current_batchid = -1;
1489         int                              i;
1490         int                              rc = 0;
1491         int                              rc1 = 0;
1492
1493         ENTRY;
1494
1495         req_capsule_set(pill, &RQF_OUT_UPDATE);
1496         ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
1497         if (ureq == NULL) {
1498                 CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
1499                        -EPROTO);
1500                 RETURN(err_serious(-EPROTO));
1501         }
1502
1503         bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
1504         if (bufsize != object_update_request_size(ureq)) {
1505                 CERROR("%s: invalid bufsize %d: rc = %d\n",
1506                        tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
1507                 RETURN(err_serious(-EPROTO));
1508         }
1509
1510         if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
1511                 CERROR("%s: invalid update buffer magic %x expect %x: "
1512                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
1513                        UPDATE_REQUEST_MAGIC, -EPROTO);
1514                 RETURN(err_serious(-EPROTO));
1515         }
1516
1517         count = ureq->ourq_count;
1518         if (count <= 0) {
1519                 CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
1520                        -EPROTO);
1521                 RETURN(err_serious(-EPROTO));
1522         }
1523
1524         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1525                              OUT_UPDATE_REPLY_SIZE);
1526         rc = req_capsule_server_pack(pill);
1527         if (rc != 0) {
1528                 CERROR("%s: Can't pack response: rc = %d\n",
1529                        tgt_name(tsi->tsi_tgt), rc);
1530                 RETURN(rc);
1531         }
1532
1533         /* Prepare the update reply buffer */
1534         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1535         if (reply == NULL)
1536                 RETURN(err_serious(-EPROTO));
1537         object_update_reply_init(reply, count);
1538         tti->tti_u.update.tti_update_reply = reply;
1539         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1540
1541         /* Walk through updates in the request to execute them synchronously */
1542         for (i = 0; i < count; i++) {
1543                 struct tgt_handler      *h;
1544                 struct dt_object        *dt_obj;
1545
1546                 update = object_update_request_get(ureq, i, NULL);
1547                 if (update == NULL)
1548                         GOTO(out, rc = -EPROTO);
1549
1550                 if (ptlrpc_req_need_swab(pill->rc_req))
1551                         lustre_swab_object_update(update);
1552
1553                 if (!fid_is_sane(&update->ou_fid)) {
1554                         CERROR("%s: invalid FID "DFID": rc = %d\n",
1555                                tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
1556                                -EPROTO);
1557                         GOTO(out, rc = err_serious(-EPROTO));
1558                 }
1559
1560                 dt_obj = dt_locate(env, dt, &update->ou_fid);
1561                 if (IS_ERR(dt_obj))
1562                         GOTO(out, rc = PTR_ERR(dt_obj));
1563
1564                 if (dt->dd_record_fid_accessed) {
1565                         lfsck_pack_rfa(&tti->tti_lr,
1566                                        lu_object_fid(&dt_obj->do_lu));
1567                         tgt_lfsck_in_notify(env, dt, &tti->tti_lr);
1568                 }
1569
1570                 tti->tti_u.update.tti_dt_object = dt_obj;
1571                 tti->tti_u.update.tti_update = update;
1572                 tti->tti_u.update.tti_update_reply_index = i;
1573
1574                 h = out_handler_find(update->ou_type);
1575                 if (unlikely(h == NULL)) {
1576                         CERROR("%s: unsupported opc: 0x%x\n",
1577                                tgt_name(tsi->tsi_tgt), update->ou_type);
1578                         GOTO(next, rc = -ENOTSUPP);
1579                 }
1580
1581                 /* Check resend case only for modifying RPC */
1582                 if (h->th_flags & MUTABOR) {
1583                         struct ptlrpc_request *req = tgt_ses_req(tsi);
1584
1585                         if (out_check_resent(env, dt, dt_obj, req,
1586                                              out_reconstruct, reply, i))
1587                                 GOTO(next, rc = 0);
1588                 }
1589
1590                 /* start transaction for modification RPC only */
1591                 if (h->th_flags & MUTABOR && current_batchid == -1) {
1592                         current_batchid = update->ou_batchid;
1593                         rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1594                         if (rc != 0)
1595                                 GOTO(next, rc);
1596                 }
1597
1598                 /* Stop the current update transaction, if the update has
1599                  * different batchid, or read-only update */
1600                 if (((current_batchid != update->ou_batchid) ||
1601                      !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
1602                         rc = out_tx_end(env, ta, rc);
1603                         current_batchid = -1;
1604                         if (rc != 0)
1605                                 GOTO(next, rc);
1606
1607                         /* start a new transaction if needed */
1608                         if (h->th_flags & MUTABOR) {
1609                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1610                                 if (rc != 0)
1611                                         GOTO(next, rc);
1612
1613                                 current_batchid = update->ou_batchid;
1614                         }
1615                 }
1616
1617                 rc = h->th_act(tsi);
1618 next:
1619                 lu_object_put(env, &dt_obj->do_lu);
1620                 if (rc < 0)
1621                         GOTO(out, rc);
1622         }
1623 out:
1624         if (current_batchid != -1) {
1625                 rc1 = out_tx_end(env, ta, rc);
1626                 if (rc == 0)
1627                         rc = rc1;
1628         }
1629
1630         RETURN(rc);
1631 }
1632
1633 struct tgt_handler tgt_out_handlers[] = {
1634 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1635 };
1636 EXPORT_SYMBOL(tgt_out_handlers);
1637