Whamcloud - gitweb
6499fc401f27ee830eaa08c87494a6d5e6de645e
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <obd_class.h>
35 #include <md_object.h>
36 #include "tgt_internal.h"
37 #include <lustre_update.h>
38
39 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
40                             struct dt_object *obj,
41                             struct object_update_reply *reply,
42                             int index)
43 {
44         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
45                dt_obd_name(dt), reply, index, 0);
46
47         object_update_result_insert(reply, NULL, 0, index, 0);
48         return;
49 }
50
51 typedef void (*out_reconstruct_t)(const struct lu_env *env,
52                                   struct dt_device *dt,
53                                   struct dt_object *obj,
54                                   struct object_update_reply *reply,
55                                   int index);
56
57 static inline int out_check_resent(const struct lu_env *env,
58                                    struct dt_device *dt,
59                                    struct dt_object *obj,
60                                    struct ptlrpc_request *req,
61                                    out_reconstruct_t reconstruct,
62                                    struct object_update_reply *reply,
63                                    int index)
64 {
65         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
66                 return 0;
67
68         if (req_xid_is_last(req)) {
69                 struct lsd_client_data *lcd;
70
71                 /* XXX this does not support mulitple transactions yet, i.e.
72                  * only 1 update RPC each time betwee MDTs */
73                 lcd = req->rq_export->exp_target_data.ted_lcd;
74
75                 req->rq_transno = lcd->lcd_last_transno;
76                 req->rq_status = lcd->lcd_last_result;
77                 if (req->rq_status != 0)
78                         req->rq_transno = 0;
79                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
80                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
81
82                 DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"status %d",
83                           req->rq_transno, req->rq_status);
84
85                 reconstruct(env, dt, obj, reply, index);
86                 return 1;
87         }
88         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
89                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
90         return 0;
91 }
92
93 static int out_create(struct tgt_session_info *tsi)
94 {
95         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
96         struct object_update    *update = tti->tti_u.update.tti_update;
97         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
98         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
99         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
100         struct lu_attr          *attr = &tti->tti_attr;
101         struct lu_fid           *fid = NULL;
102         struct obdo             *wobdo;
103         size_t                  size;
104         int                     rc;
105
106         ENTRY;
107
108         wobdo = object_update_param_get(update, 0, &size);
109         if (IS_ERR(wobdo) || size != sizeof(*wobdo)) {
110                 CERROR("%s: obdo is NULL, invalid RPC: rc = %ld\n",
111                        tgt_name(tsi->tsi_tgt), PTR_ERR(wobdo));
112                 RETURN(err_serious(PTR_ERR(wobdo)));
113         }
114
115         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
116                 lustre_swab_obdo(wobdo);
117         lustre_get_wire_obdo(NULL, lobdo, wobdo);
118         la_from_obdo(attr, lobdo, lobdo->o_valid);
119
120         dof->dof_type = dt_mode_to_dft(attr->la_mode);
121         if (update->ou_params_count > 1) {
122                 fid = object_update_param_get(update, 1, &size);
123                 if (IS_ERR(fid) || size != sizeof(*fid)) {
124                         CERROR("%s: invalid fid: rc = %ld\n",
125                                tgt_name(tsi->tsi_tgt), PTR_ERR(fid));
126                         RETURN(err_serious(PTR_ERR(fid)));
127                 }
128                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
129                         lustre_swab_lu_fid(fid);
130                 if (!fid_is_sane(fid)) {
131                         CERROR("%s: invalid fid "DFID": rc = %d\n",
132                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
133                         RETURN(err_serious(-EPROTO));
134                 }
135         }
136
137         if (lu_object_exists(&obj->do_lu))
138                 RETURN(-EEXIST);
139
140         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
141                            &tti->tti_tea, tti->tti_tea.ta_handle,
142                            tti->tti_u.update.tti_update_reply,
143                            tti->tti_u.update.tti_update_reply_index);
144
145         RETURN(rc);
146 }
147
148 static int out_attr_set(struct tgt_session_info *tsi)
149 {
150         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
151         struct object_update    *update = tti->tti_u.update.tti_update;
152         struct lu_attr          *attr = &tti->tti_attr;
153         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
154         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
155         struct obdo             *wobdo;
156         size_t                   size;
157         int                      rc;
158
159         ENTRY;
160
161         wobdo = object_update_param_get(update, 0, &size);
162         if (IS_ERR(wobdo) || size != sizeof(*wobdo)) {
163                 CERROR("%s: empty obdo in the update: rc = %ld\n",
164                        tgt_name(tsi->tsi_tgt), PTR_ERR(wobdo));
165                 RETURN(err_serious(PTR_ERR(wobdo)));
166         }
167
168         attr->la_valid = 0;
169         attr->la_valid = 0;
170
171         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
172                 lustre_swab_obdo(wobdo);
173         lustre_get_wire_obdo(NULL, lobdo, wobdo);
174         la_from_obdo(attr, lobdo, lobdo->o_valid);
175
176         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
177                              tti->tti_tea.ta_handle,
178                              tti->tti_u.update.tti_update_reply,
179                              tti->tti_u.update.tti_update_reply_index);
180
181         RETURN(rc);
182 }
183
184 static int out_attr_get(struct tgt_session_info *tsi)
185 {
186         const struct lu_env     *env = tsi->tsi_env;
187         struct tgt_thread_info  *tti = tgt_th_info(env);
188         struct object_update    *update = tti->tti_u.update.tti_update;
189         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
190         struct lu_attr          *la = &tti->tti_attr;
191         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
192         int                     idx = tti->tti_u.update.tti_update_reply_index;
193         int                     rc;
194
195         ENTRY;
196
197         if (unlikely(update->ou_result_size < sizeof(*obdo)))
198                 return -EPROTO;
199
200         if (!lu_object_exists(&obj->do_lu)) {
201                 /* Usually, this will be called when the master MDT try
202                  * to init a remote object(see osp_object_init), so if
203                  * the object does not exist on slave, we need set BANSHEE flag,
204                  * so the object can be removed from the cache immediately */
205                 set_bit(LU_OBJECT_HEARD_BANSHEE,
206                         &obj->do_lu.lo_header->loh_flags);
207                 RETURN(-ENOENT);
208         }
209
210         dt_read_lock(env, obj, MOR_TGT_CHILD);
211         rc = dt_attr_get(env, obj, la);
212         if (rc)
213                 GOTO(out_unlock, rc);
214
215         obdo->o_valid = 0;
216         obdo_from_la(obdo, la, la->la_valid);
217         lustre_set_wire_obdo(NULL, obdo, obdo);
218
219 out_unlock:
220         dt_read_unlock(env, obj);
221
222         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
223                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
224                0, rc);
225
226         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
227                                     sizeof(*obdo), idx, rc);
228
229         RETURN(rc);
230 }
231
232 static int out_xattr_get(struct tgt_session_info *tsi)
233 {
234         const struct lu_env        *env = tsi->tsi_env;
235         struct tgt_thread_info     *tti = tgt_th_info(env);
236         struct object_update       *update = tti->tti_u.update.tti_update;
237         struct lu_buf              *lbuf = &tti->tti_buf;
238         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
239         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
240         char                       *name;
241         struct object_update_result *update_result;
242         int                     idx = tti->tti_u.update.tti_update_reply_index;
243         int                        rc;
244
245         ENTRY;
246
247         if (!lu_object_exists(&obj->do_lu)) {
248                 set_bit(LU_OBJECT_HEARD_BANSHEE,
249                         &obj->do_lu.lo_header->loh_flags);
250                 RETURN(-ENOENT);
251         }
252
253         name = object_update_param_get(update, 0, NULL);
254         if (IS_ERR(name)) {
255                 CERROR("%s: empty name for xattr get: rc = %ld\n",
256                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
257                 RETURN(err_serious(PTR_ERR(name)));
258         }
259
260         update_result = object_update_result_get(reply, 0, NULL);
261         if (update_result == NULL) {
262                 CERROR("%s: empty name for xattr get: rc = %d\n",
263                        tgt_name(tsi->tsi_tgt), -EPROTO);
264                 RETURN(err_serious(-EPROTO));
265         }
266
267         lbuf->lb_len = (int)tti->tti_u.update.tti_update->ou_result_size;
268         lbuf->lb_buf = update_result->our_data;
269         if (lbuf->lb_len == 0)
270                 lbuf->lb_buf = 0;
271         dt_read_lock(env, obj, MOR_TGT_CHILD);
272         rc = dt_xattr_get(env, obj, lbuf, name);
273         dt_read_unlock(env, obj);
274         if (rc < 0)
275                 lbuf->lb_len = 0;
276         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
277                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
278                name, (int)lbuf->lb_len);
279
280         GOTO(out, rc);
281
282 out:
283         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
284         RETURN(0);
285 }
286
287 static int out_index_lookup(struct tgt_session_info *tsi)
288 {
289         const struct lu_env     *env = tsi->tsi_env;
290         struct tgt_thread_info  *tti = tgt_th_info(env);
291         struct object_update    *update = tti->tti_u.update.tti_update;
292         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
293         char                    *name;
294         int                      rc;
295
296         ENTRY;
297
298         if (unlikely(update->ou_result_size < sizeof(tti->tti_fid1)))
299                 return -EPROTO;
300
301         if (!lu_object_exists(&obj->do_lu))
302                 RETURN(-ENOENT);
303
304         name = object_update_param_get(update, 0, NULL);
305         if (IS_ERR(name)) {
306                 CERROR("%s: empty name for lookup: rc = %ld\n",
307                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
308                 RETURN(err_serious(PTR_ERR(name)));
309         }
310
311         dt_read_lock(env, obj, MOR_TGT_CHILD);
312         if (!dt_try_as_dir(env, obj))
313                 GOTO(out_unlock, rc = -ENOTDIR);
314
315         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
316                        (struct dt_key *)name);
317
318         if (rc < 0)
319                 GOTO(out_unlock, rc);
320
321         if (rc == 0)
322                 rc += 1;
323
324 out_unlock:
325         dt_read_unlock(env, obj);
326
327         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
328                PFID(lu_object_fid(&obj->do_lu)), name,
329                PFID(&tti->tti_fid1), rc);
330
331         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
332                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
333                0, rc);
334
335         object_update_result_insert(tti->tti_u.update.tti_update_reply,
336                             &tti->tti_fid1, sizeof(tti->tti_fid1),
337                             tti->tti_u.update.tti_update_reply_index, rc);
338         RETURN(rc);
339 }
340
341 static int out_xattr_set(struct tgt_session_info *tsi)
342 {
343         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
344         struct object_update    *update = tti->tti_u.update.tti_update;
345         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
346         struct lu_buf           *lbuf = &tti->tti_buf;
347         char                    *name;
348         char                    *buf;
349         __u32                   *tmp;
350         size_t                   buf_len = 0;
351         int                      flag;
352         size_t                   size = 0;
353         int                      rc;
354         ENTRY;
355
356         name = object_update_param_get(update, 0, NULL);
357         if (IS_ERR(name)) {
358                 CERROR("%s: empty name for xattr set: rc = %ld\n",
359                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
360                 RETURN(err_serious(PTR_ERR(name)));
361         }
362
363         /* If buffer == NULL (-ENODATA), then it might mean delete xattr */
364         buf = object_update_param_get(update, 1, &buf_len);
365         if (IS_ERR(buf) && PTR_ERR(buf) != -ENODATA)
366                 RETURN(err_serious(PTR_ERR(buf)));
367
368         lbuf->lb_buf = buf;
369         lbuf->lb_len = buf_len;
370
371         tmp = object_update_param_get(update, 2, &size);
372         if (IS_ERR(tmp) || size != sizeof(*tmp)) {
373                 CERROR("%s: emptry or wrong size %zu flag: rc = %ld\n",
374                        tgt_name(tsi->tsi_tgt), size, PTR_ERR(tmp));
375                 RETURN(err_serious(PTR_ERR(tmp)));
376         }
377
378         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
379                 __swab32s(tmp);
380         flag = *tmp;
381
382         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
383                               &tti->tti_tea, tti->tti_tea.ta_handle,
384                               tti->tti_u.update.tti_update_reply,
385                               tti->tti_u.update.tti_update_reply_index);
386         RETURN(rc);
387 }
388
389 static int out_xattr_del(struct tgt_session_info *tsi)
390 {
391         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
392         struct object_update    *update = tti->tti_u.update.tti_update;
393         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
394         char                    *name;
395         int                      rc;
396         ENTRY;
397
398         name = object_update_param_get(update, 0, NULL);
399         if (IS_ERR(name)) {
400                 CERROR("%s: empty name for xattr set: rc = %ld\n",
401                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
402                 RETURN(err_serious(PTR_ERR(name)));
403         }
404
405         rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea,
406                               tti->tti_tea.ta_handle,
407                               tti->tti_u.update.tti_update_reply,
408                               tti->tti_u.update.tti_update_reply_index);
409         RETURN(rc);
410 }
411
412 /**
413  * increase ref of the object
414  **/
415 static int out_ref_add(struct tgt_session_info *tsi)
416 {
417         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
418         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
419         int                      rc;
420
421         ENTRY;
422
423         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
424                             tti->tti_tea.ta_handle,
425                             tti->tti_u.update.tti_update_reply,
426                             tti->tti_u.update.tti_update_reply_index);
427         RETURN(rc);
428 }
429
430 static int out_ref_del(struct tgt_session_info *tsi)
431 {
432         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
433         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
434         int                      rc;
435
436         ENTRY;
437
438         if (!lu_object_exists(&obj->do_lu))
439                 RETURN(-ENOENT);
440
441         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
442                             tti->tti_tea.ta_handle,
443                             tti->tti_u.update.tti_update_reply,
444                             tti->tti_u.update.tti_update_reply_index);
445         RETURN(rc);
446 }
447
448 static int out_index_insert(struct tgt_session_info *tsi)
449 {
450         struct tgt_thread_info  *tti    = tgt_th_info(tsi->tsi_env);
451         struct object_update    *update = tti->tti_u.update.tti_update;
452         struct dt_object        *obj    = tti->tti_u.update.tti_dt_object;
453         struct dt_insert_rec    *rec    = &tti->tti_rec;
454         struct lu_fid           *fid;
455         char                    *name;
456         __u32                   *ptype;
457         int                      rc     = 0;
458         size_t                   size;
459         ENTRY;
460
461         name = object_update_param_get(update, 0, NULL);
462         if (IS_ERR(name)) {
463                 CERROR("%s: empty name for index insert: rc = %ld\n",
464                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
465                 RETURN(err_serious(PTR_ERR(name)));
466         }
467
468         fid = object_update_param_get(update, 1, &size);
469         if (IS_ERR(fid) || size != sizeof(*fid)) {
470                 CERROR("%s: invalid fid: rc = %ld\n",
471                        tgt_name(tsi->tsi_tgt), PTR_ERR(fid));
472                 RETURN(err_serious(PTR_ERR(fid)));
473         }
474
475         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
476                 lustre_swab_lu_fid(fid);
477
478         if (!fid_is_sane(fid)) {
479                 CERROR("%s: invalid FID "DFID": rc = %d\n",
480                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
481                 RETURN(err_serious(-EPROTO));
482         }
483
484         ptype = object_update_param_get(update, 2, &size);
485         if (IS_ERR(ptype) || size != sizeof(*ptype)) {
486                 CERROR("%s: invalid type for index insert: rc = %ld\n",
487                        tgt_name(tsi->tsi_tgt), PTR_ERR(ptype));
488                 RETURN(err_serious(PTR_ERR(ptype)));
489         }
490
491         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
492                 __swab32s(ptype);
493
494         rec->rec_fid = fid;
495         rec->rec_type = *ptype;
496
497         rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec,
498                                  (const struct dt_key *)name, &tti->tti_tea,
499                                  tti->tti_tea.ta_handle,
500                                  tti->tti_u.update.tti_update_reply,
501                                  tti->tti_u.update.tti_update_reply_index);
502         RETURN(rc);
503 }
504
505 static int out_index_delete(struct tgt_session_info *tsi)
506 {
507         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
508         struct object_update    *update = tti->tti_u.update.tti_update;
509         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
510         char                    *name;
511         int                      rc = 0;
512
513         if (!lu_object_exists(&obj->do_lu))
514                 RETURN(-ENOENT);
515
516         name = object_update_param_get(update, 0, NULL);
517         if (IS_ERR(name)) {
518                 CERROR("%s: empty name for index delete: rc = %ld\n",
519                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
520                 RETURN(err_serious(PTR_ERR(name)));
521         }
522
523         rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name,
524                                  &tti->tti_tea, tti->tti_tea.ta_handle,
525                                  tti->tti_u.update.tti_update_reply,
526                                  tti->tti_u.update.tti_update_reply_index);
527         RETURN(rc);
528 }
529
530 static int out_destroy(struct tgt_session_info *tsi)
531 {
532         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
533         struct object_update    *update = tti->tti_u.update.tti_update;
534         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
535         struct lu_fid           *fid;
536         int                      rc;
537         ENTRY;
538
539         fid = &update->ou_fid;
540         if (!fid_is_sane(fid)) {
541                 CERROR("%s: invalid FID "DFID": rc = %d\n",
542                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
543                 RETURN(err_serious(-EPROTO));
544         }
545
546         if (!lu_object_exists(&obj->do_lu))
547                 RETURN(-ENOENT);
548
549         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
550                             tti->tti_tea.ta_handle,
551                             tti->tti_u.update.tti_update_reply,
552                             tti->tti_u.update.tti_update_reply_index);
553
554         RETURN(rc);
555 }
556
557 static int out_write(struct tgt_session_info *tsi)
558 {
559         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
560         struct object_update    *update = tti->tti_u.update.tti_update;
561         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
562         struct lu_buf           *lbuf = &tti->tti_buf;
563         char                    *buf;
564         __u64                   *tmp;
565         size_t                  size = 0;
566         size_t                  buf_len = 0;
567         loff_t                  pos;
568         int                      rc;
569         ENTRY;
570
571         buf = object_update_param_get(update, 0, &buf_len);
572         if (IS_ERR(buf) || buf_len == 0) {
573                 CERROR("%s: empty buf for xattr set: rc = %ld\n",
574                        tgt_name(tsi->tsi_tgt), PTR_ERR(buf));
575                 RETURN(err_serious(PTR_ERR(buf)));
576         }
577         lbuf->lb_buf = buf;
578         lbuf->lb_len = buf_len;
579
580         tmp = object_update_param_get(update, 1, &size);
581         if (IS_ERR(tmp) || size != sizeof(*tmp)) {
582                 CERROR("%s: empty or wrong size %zu pos: rc = %ld\n",
583                        tgt_name(tsi->tsi_tgt), size, PTR_ERR(tmp));
584                 RETURN(err_serious(PTR_ERR(tmp)));
585         }
586
587         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
588                 __swab64s(tmp);
589         pos = *tmp;
590
591         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
592                           &tti->tti_tea, tti->tti_tea.ta_handle,
593                           tti->tti_u.update.tti_update_reply,
594                           tti->tti_u.update.tti_update_reply_index);
595         RETURN(rc);
596 }
597
598 static int out_read(struct tgt_session_info *tsi)
599 {
600         const struct lu_env     *env = tsi->tsi_env;
601         struct tgt_thread_info  *tti = tgt_th_info(env);
602         struct object_update    *update = tti->tti_u.update.tti_update;
603         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
604         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
605         int index = tti->tti_u.update.tti_update_reply_index;
606         struct lu_rdbuf *rdbuf;
607         struct object_update_result *update_result;
608         struct out_read_reply   *orr;
609         void *tmp;
610         size_t size;
611         size_t total_size = 0;
612         __u64 pos;
613         unsigned int i;
614         unsigned int nbufs;
615         int rc = 0;
616         ENTRY;
617
618         update_result = object_update_result_get(reply, index, NULL);
619         LASSERT(update_result != NULL);
620         update_result->our_datalen = sizeof(*orr);
621
622         if (!lu_object_exists(&obj->do_lu))
623                 GOTO(out, rc = -ENOENT);
624
625         tmp = object_update_param_get(update, 0, NULL);
626         if (IS_ERR(tmp)) {
627                 CERROR("%s: empty size for read: rc = %ld\n",
628                        tgt_name(tsi->tsi_tgt), PTR_ERR(tmp));
629                 GOTO(out, rc = err_serious(PTR_ERR(tmp)));
630         }
631         size = le64_to_cpu(*(size_t *)(tmp));
632
633         tmp = object_update_param_get(update, 1, NULL);
634         if (IS_ERR(tmp)) {
635                 CERROR("%s: empty pos for read: rc = %ld\n",
636                        tgt_name(tsi->tsi_tgt), PTR_ERR(tmp));
637                 GOTO(out, rc = err_serious(PTR_ERR(tmp)));
638         }
639         pos = le64_to_cpu(*(__u64 *)(tmp));
640
641         /* Put the offset into the begining of the buffer in reply */
642         orr = (struct out_read_reply *)update_result->our_data;
643
644         nbufs = (size + OUT_BULK_BUFFER_SIZE - 1) / OUT_BULK_BUFFER_SIZE;
645         OBD_ALLOC(rdbuf, sizeof(struct lu_rdbuf) +
646                          nbufs * sizeof(rdbuf->rb_bufs[0]));
647         if (rdbuf == NULL)
648                 GOTO(out, rc = -ENOMEM);
649
650         rdbuf->rb_nbufs = 0;
651         total_size = 0;
652         for (i = 0; i < nbufs; i++) {
653                 __u32 read_size;
654
655                 read_size = size > OUT_BULK_BUFFER_SIZE ?
656                             OUT_BULK_BUFFER_SIZE : size;
657                 OBD_ALLOC(rdbuf->rb_bufs[i].lb_buf, read_size);
658                 if (rdbuf->rb_bufs[i].lb_buf == NULL)
659                         GOTO(out_free, rc = -ENOMEM);
660
661                 rdbuf->rb_bufs[i].lb_len = read_size;
662                 dt_read_lock(env, obj, MOR_TGT_CHILD);
663                 rc = dt_read(env, obj, &rdbuf->rb_bufs[i], &pos);
664                 dt_read_unlock(env, obj);
665
666                 total_size += rc < 0 ? 0 : rc;
667                 if (rc <= 0)
668                         break;
669
670                 rdbuf->rb_nbufs++;
671                 size -= read_size;
672         }
673
674         /* send pages to client */
675         rc = tgt_send_buffer(tsi, rdbuf);
676         if (rc < 0)
677                 GOTO(out_free, rc);
678
679         orr->orr_size = total_size;
680         orr->orr_offset = pos;
681
682         orr_cpu_to_le(orr, orr);
683         update_result->our_datalen += orr->orr_size;
684 out_free:
685         for (i = 0; i < nbufs; i++) {
686                 if (rdbuf->rb_bufs[i].lb_buf != NULL) {
687                         OBD_FREE(rdbuf->rb_bufs[i].lb_buf,
688                                  rdbuf->rb_bufs[i].lb_len);
689                 }
690         }
691         OBD_FREE(rdbuf, sizeof(struct lu_rdbuf) +
692                         nbufs * sizeof(rdbuf->rb_bufs[0]));
693 out:
694         /* Insert read buffer */
695         update_result->our_rc = ptlrpc_status_hton(rc);
696         reply->ourp_lens[index] = cfs_size_round(update_result->our_datalen +
697                                                  sizeof(*update_result));
698         RETURN(rc);
699 }
700
701 static int out_noop(struct tgt_session_info *tsi)
702 {
703         return 0;
704 }
705
706 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
707 [opc - OUT_CREATE] = {                                  \
708         .th_name    = name,                             \
709         .th_fail_id = 0,                                \
710         .th_opc     = opc,                              \
711         .th_flags   = flags,                            \
712         .th_act     = fn,                               \
713         .th_fmt     = NULL,                             \
714         .th_version = 0,                                \
715 }
716
717 static struct tgt_handler out_update_ops[] = {
718         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
719                      out_create),
720         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
721                      out_destroy),
722         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
723                      out_ref_add),
724         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
725                      out_ref_del),
726         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
727                      out_attr_set),
728         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
729                      out_attr_get),
730         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
731                      out_xattr_set),
732         DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO,
733                      out_xattr_del),
734         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
735                      out_xattr_get),
736         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
737                      out_index_lookup),
738         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
739                      MUTABOR | HABEO_REFERO, out_index_insert),
740         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
741                      MUTABOR | HABEO_REFERO, out_index_delete),
742         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
743         DEF_OUT_HNDL(OUT_READ, "out_read", HABEO_REFERO, out_read),
744         DEF_OUT_HNDL(OUT_NOOP, "out_noop", HABEO_REFERO, out_noop),
745 };
746
747 static struct tgt_handler *out_handler_find(__u32 opc)
748 {
749         struct tgt_handler *h;
750
751         h = NULL;
752         if (OUT_CREATE <= opc && opc < OUT_LAST) {
753                 h = &out_update_ops[opc - OUT_CREATE];
754                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
755                          h->th_opc, opc);
756         } else {
757                 h = NULL; /* unsupported opc */
758         }
759         return h;
760 }
761
762 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
763                         struct thandle_exec_args *ta, struct obd_export *exp)
764 {
765         ta->ta_argno = 0;
766         ta->ta_handle = dt_trans_create(env, dt);
767         if (IS_ERR(ta->ta_handle)) {
768                 int rc;
769
770                 rc = PTR_ERR(ta->ta_handle);
771                 ta->ta_handle = NULL;
772                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
773                        rc);
774                 return rc;
775         }
776         if (exp->exp_need_sync)
777                 ta->ta_handle->th_sync = 1;
778
779         return 0;
780 }
781
782 static int out_trans_start(const struct lu_env *env,
783                            struct thandle_exec_args *ta)
784 {
785         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
786 }
787
788 static int out_trans_stop(const struct lu_env *env,
789                           struct thandle_exec_args *ta, int err)
790 {
791         int i;
792         int rc;
793
794         ta->ta_handle->th_result = err;
795         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
796         for (i = 0; i < ta->ta_argno; i++) {
797                 if (ta->ta_args[i]->object != NULL) {
798                         struct dt_object *obj = ta->ta_args[i]->object;
799
800                         /* If the object is being created during this
801                          * transaction, we need to remove them from the
802                          * cache immediately, because a few layers are
803                          * missing in OUT handler, i.e. the object might
804                          * not be initialized in all layers */
805                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
806                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
807                                         &obj->do_lu.lo_header->loh_flags);
808                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
809                         ta->ta_args[i]->object = NULL;
810                 }
811         }
812         ta->ta_handle = NULL;
813         ta->ta_argno = 0;
814
815         return rc;
816 }
817
818 static int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
819                       int declare_ret)
820 {
821         struct tgt_session_info *tsi = tgt_ses_info(env);
822         int                     i;
823         int                     rc;
824         int                     rc1;
825         ENTRY;
826
827         if (ta->ta_handle == NULL)
828                 RETURN(0);
829
830         if (declare_ret != 0 || ta->ta_argno == 0)
831                 GOTO(stop, rc = declare_ret);
832
833         LASSERT(ta->ta_handle->th_dev != NULL);
834         rc = out_trans_start(env, ta);
835         if (unlikely(rc != 0))
836                 GOTO(stop, rc);
837
838         for (i = 0; i < ta->ta_argno; i++) {
839                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
840                                              ta->ta_args[i]);
841                 if (unlikely(rc != 0)) {
842                         CDEBUG(D_INFO, "error during execution of #%u from"
843                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
844                                ta->ta_args[i]->line, rc);
845                         while (--i >= 0) {
846                                 if (ta->ta_args[i]->undo_fn != NULL)
847                                         ta->ta_args[i]->undo_fn(env,
848                                                                ta->ta_handle,
849                                                                ta->ta_args[i]);
850                                 else
851                                         CERROR("%s: undo for %s:%d: rc = %d\n",
852                                              dt_obd_name(ta->ta_handle->th_dev),
853                                                ta->ta_args[i]->file,
854                                                ta->ta_args[i]->line, -ENOTSUPP);
855                         }
856                         break;
857                 }
858                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
859                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
860         }
861
862         /* Only fail for real updates, XXX right now llog updates will be
863         * ignore, whose updates count is usually 1, so failover test
864         * case will spot this FAIL_UPDATE_NET_REP precisely, and it will
865         * be removed after async update patch is landed. */
866         if (ta->ta_argno > 1)
867                 tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
868
869 stop:
870         rc1 = out_trans_stop(env, ta, rc);
871         if (rc == 0)
872                 rc = rc1;
873
874         ta->ta_handle = NULL;
875         ta->ta_argno = 0;
876
877         RETURN(rc);
878 }
879
880 /**
881  * Object updates between Targets. Because all the updates has been
882  * dis-assemblied into object updates at sender side, so OUT will
883  * call OSD API directly to execute these updates.
884  *
885  * In DNE phase I all of the updates in the request need to be executed
886  * in one transaction, and the transaction has to be synchronously.
887  *
888  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
889  * format.
890  */
891 int out_handle(struct tgt_session_info *tsi)
892 {
893         const struct lu_env             *env = tsi->tsi_env;
894         struct tgt_thread_info          *tti = tgt_th_info(env);
895         struct thandle_exec_args        *ta = &tti->tti_tea;
896         struct req_capsule              *pill = tsi->tsi_pill;
897         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
898         struct out_update_header        *ouh;
899         struct out_update_buffer        *oub = NULL;
900         struct object_update            *update;
901         struct object_update_reply      *reply;
902         struct ptlrpc_bulk_desc         *desc = NULL;
903         struct l_wait_info              lwi;
904         void                            **update_bufs;
905         int                             current_batchid = -1;
906         __u32                           update_buf_count;
907         unsigned int                    i;
908         unsigned int                    reply_index = 0;
909         int                             rc = 0;
910         int                             rc1 = 0;
911         int                             ouh_size, reply_size;
912         int                             updates;
913         ENTRY;
914
915         req_capsule_set(pill, &RQF_OUT_UPDATE);
916         ouh_size = req_capsule_get_size(pill, &RMF_OUT_UPDATE_HEADER,
917                                         RCL_CLIENT);
918         if (ouh_size <= 0)
919                 RETURN(err_serious(-EPROTO));
920
921         ouh = req_capsule_client_get(pill, &RMF_OUT_UPDATE_HEADER);
922         if (ouh == NULL)
923                 RETURN(err_serious(-EPROTO));
924
925         if (ouh->ouh_magic != OUT_UPDATE_HEADER_MAGIC) {
926                 CERROR("%s: invalid update buffer magic %x expect %x: "
927                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ouh->ouh_magic,
928                        UPDATE_REQUEST_MAGIC, -EPROTO);
929                 RETURN(err_serious(-EPROTO));
930         }
931
932         update_buf_count = ouh->ouh_count;
933         if (update_buf_count == 0)
934                 RETURN(err_serious(-EPROTO));
935
936         OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count);
937         if (update_bufs == NULL)
938                 RETURN(-ENOMEM);
939
940         if (ouh->ouh_inline_length > 0) {
941                 update_bufs[0] = ouh->ouh_inline_data;
942         } else {
943                 struct out_update_buffer *tmp;
944
945                 oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
946                 if (oub == NULL)
947                         GOTO(out_free, rc = -EPROTO);
948
949                 desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
950                                             PTLRPC_BULK_OPS_COUNT,
951                                             PTLRPC_BULK_GET_SINK |
952                                             PTLRPC_BULK_BUF_KVEC,
953                                             MDS_BULK_PORTAL,
954                                             &ptlrpc_bulk_kvec_ops);
955                 if (desc == NULL)
956                         GOTO(out_free, rc = -ENOMEM);
957
958                 tmp = oub;
959                 for (i = 0; i < update_buf_count; i++, tmp++) {
960                         if (tmp->oub_size >= OUT_MAXREQSIZE)
961                                 GOTO(out_free, rc = -EPROTO);
962
963                         OBD_ALLOC(update_bufs[i], tmp->oub_size);
964                         if (update_bufs[i] == NULL)
965                                 GOTO(out_free, rc = -ENOMEM);
966
967                         desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
968                                                         tmp->oub_size);
969                 }
970
971                 pill->rc_req->rq_bulk_write = 1;
972                 rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
973                 if (rc != 0)
974                         GOTO(out_free, rc);
975
976                 rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
977                 if (rc < 0)
978                         GOTO(out_free, rc);
979         }
980         /* validate the request and calculate the total update count and
981          * set it to reply */
982         reply_size = 0;
983         updates = 0;
984         for (i = 0; i < update_buf_count; i++) {
985                 struct object_update_request    *our;
986                 int                              j;
987
988                 our = update_bufs[i];
989                 if (ptlrpc_req_need_swab(pill->rc_req))
990                         lustre_swab_object_update_request(our);
991
992                 if (our->ourq_magic != UPDATE_REQUEST_MAGIC) {
993                         CERROR("%s: invalid update buffer magic %x"
994                                " expect %x: rc = %d\n",
995                                tgt_name(tsi->tsi_tgt), our->ourq_magic,
996                                UPDATE_REQUEST_MAGIC, -EPROTO);
997                         GOTO(out_free, rc = -EPROTO);
998                 }
999                 updates += our->ourq_count;
1000
1001                 /* need to calculate reply size */
1002                 for (j = 0; j < our->ourq_count; j++) {
1003                         update = object_update_request_get(our, j, NULL);
1004                         if (update == NULL)
1005                                 GOTO(out, rc = -EPROTO);
1006                         if (ptlrpc_req_need_swab(pill->rc_req))
1007                                 lustre_swab_object_update(update);
1008
1009                         if (!fid_is_sane(&update->ou_fid)) {
1010                                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1011                                        tgt_name(tsi->tsi_tgt),
1012                                        PFID(&update->ou_fid), -EPROTO);
1013                                 GOTO(out, rc = err_serious(-EPROTO));
1014                         }
1015
1016                         /* XXX: what ou_result_size can be considered safe? */
1017
1018                         reply_size += sizeof(reply->ourp_lens[0]);
1019                         reply_size += sizeof(struct object_update_result);
1020                         reply_size += update->ou_result_size;
1021                 }
1022         }
1023         reply_size += sizeof(*reply);
1024
1025         if (unlikely(reply_size > ouh->ouh_reply_size)) {
1026                 CERROR("%s: too small reply buf %u for %u, need %u at least\n",
1027                        tgt_name(tsi->tsi_tgt), ouh->ouh_reply_size,
1028                        updates, reply_size);
1029                 GOTO(out_free, rc = -EPROTO);
1030         }
1031
1032         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1033                              ouh->ouh_reply_size);
1034         rc = req_capsule_server_pack(pill);
1035         if (rc != 0) {
1036                 CERROR("%s: Can't pack response: rc = %d\n",
1037                        tgt_name(tsi->tsi_tgt), rc);
1038                 GOTO(out_free, rc = -EPROTO);
1039         }
1040
1041         /* Prepare the update reply buffer */
1042         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1043         if (reply == NULL)
1044                 GOTO(out_free, rc = err_serious(-EPROTO));
1045         reply->ourp_magic = UPDATE_REPLY_MAGIC;
1046         reply->ourp_count = updates;
1047         tti->tti_u.update.tti_update_reply = reply;
1048         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1049  
1050         /* Walk through updates in the request to execute them */
1051         for (i = 0; i < update_buf_count; i++) {
1052                 struct tgt_handler      *h;
1053                 struct dt_object        *dt_obj;
1054                 int                     update_count;
1055                 struct object_update_request *our;
1056                 int                     j;
1057
1058                 our = update_bufs[i];
1059                 update_count = our->ourq_count;
1060                 for (j = 0; j < update_count; j++) {
1061                         update = object_update_request_get(our, j, NULL);
1062
1063                         dt_obj = dt_locate(env, dt, &update->ou_fid);
1064                         if (IS_ERR(dt_obj))
1065                                 GOTO(out, rc = PTR_ERR(dt_obj));
1066
1067                         if (dt->dd_record_fid_accessed) {
1068                                 lfsck_pack_rfa(&tti->tti_lr,
1069                                                lu_object_fid(&dt_obj->do_lu),
1070                                                LE_FID_ACCESSED,
1071                                                LFSCK_TYPE_LAYOUT);
1072                                 tgt_lfsck_in_notify(env, dt, &tti->tti_lr,
1073                                                     NULL);
1074                         }
1075
1076                         tti->tti_u.update.tti_dt_object = dt_obj;
1077                         tti->tti_u.update.tti_update = update;
1078                         tti->tti_u.update.tti_update_reply_index = reply_index;
1079
1080                         h = out_handler_find(update->ou_type);
1081                         if (unlikely(h == NULL)) {
1082                                 CERROR("%s: unsupported opc: 0x%x\n",
1083                                        tgt_name(tsi->tsi_tgt), update->ou_type);
1084                                 GOTO(next, rc = -ENOTSUPP);
1085                         }
1086
1087                         /* Check resend case only for modifying RPC */
1088                         if (h->th_flags & MUTABOR) {
1089                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1090
1091                                 if (out_check_resent(env, dt, dt_obj, req,
1092                                                      out_reconstruct, reply,
1093                                                      reply_index))
1094                                         GOTO(next, rc = 0);
1095                         }
1096
1097                         /* start transaction for modification RPC only */
1098                         if (h->th_flags & MUTABOR && current_batchid == -1) {
1099                                 current_batchid = update->ou_batchid;
1100                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1101                                 if (rc != 0)
1102                                         GOTO(next, rc);
1103
1104                                 if (update->ou_flags & UPDATE_FL_SYNC)
1105                                         ta->ta_handle->th_sync = 1;
1106                         }
1107
1108                         /* Stop the current update transaction, if the update
1109                          * has different batchid, or read-only update */
1110                         if (((current_batchid != update->ou_batchid) ||
1111                              !(h->th_flags & MUTABOR)) &&
1112                              ta->ta_handle != NULL) {
1113                                 rc = out_tx_end(env, ta, rc);
1114                                 current_batchid = -1;
1115                                 if (rc != 0)
1116                                         GOTO(next, rc);
1117
1118                                 /* start a new transaction if needed */
1119                                 if (h->th_flags & MUTABOR) {
1120                                         rc = out_tx_start(env, dt, ta,
1121                                                           tsi->tsi_exp);
1122                                         if (rc != 0)
1123                                                 GOTO(next, rc);
1124                                         if (update->ou_flags & UPDATE_FL_SYNC)
1125                                                 ta->ta_handle->th_sync = 1;
1126                                         current_batchid = update->ou_batchid;
1127                                 }
1128                         }
1129
1130                         rc = h->th_act(tsi);
1131 next:
1132                         reply_index++;
1133                         lu_object_put(env, &dt_obj->do_lu);
1134                         if (rc < 0)
1135                                 GOTO(out, rc);
1136                 }
1137         }
1138 out:
1139         if (current_batchid != -1) {
1140                 rc1 = out_tx_end(env, ta, rc);
1141                 if (rc == 0)
1142                         rc = rc1;
1143         }
1144
1145 out_free:
1146         if (update_bufs != NULL) {
1147                 if (oub != NULL) {
1148                         for (i = 0; i < update_buf_count; i++, oub++) {
1149                                 if (update_bufs[i] != NULL)
1150                                         OBD_FREE(update_bufs[i], oub->oub_size);
1151                         }
1152                 }
1153
1154                 OBD_FREE(update_bufs, sizeof(*update_bufs) * update_buf_count);
1155         }
1156
1157         if (desc != NULL)
1158                 ptlrpc_free_bulk(desc);
1159
1160         RETURN(rc);
1161 }
1162
1163 struct tgt_handler tgt_out_handlers[] = {
1164 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1165 };
1166 EXPORT_SYMBOL(tgt_out_handlers);
1167