Whamcloud - gitweb
LU-7039 llog: update llog header and size
[fs/lustre-release.git] / lustre / target / out_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  *
25  * lustre/target/out_handler.c
26  *
27  * Object update handler between targets.
28  *
29  * Author: di.wang <di.wang@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <llog_swab.h>
35 #include <lustre_swab.h>
36 #include <lustre_update.h>
37 #include <md_object.h>
38 #include <obd_class.h>
39 #include "tgt_internal.h"
40
41 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
42                             struct dt_object *obj,
43                             struct object_update_reply *reply,
44                             int index)
45 {
46         CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
47                dt_obd_name(dt), reply, index, 0);
48
49         object_update_result_insert(reply, NULL, 0, index, 0);
50         return;
51 }
52
53 typedef void (*out_reconstruct_t)(const struct lu_env *env,
54                                   struct dt_device *dt,
55                                   struct dt_object *obj,
56                                   struct object_update_reply *reply,
57                                   int index);
58
59 static inline int out_check_resent(const struct lu_env *env,
60                                    struct dt_device *dt,
61                                    struct dt_object *obj,
62                                    struct ptlrpc_request *req,
63                                    out_reconstruct_t reconstruct,
64                                    struct object_update_reply *reply,
65                                    int index)
66 {
67         if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
68                 return 0;
69
70         if (req_xid_is_last(req)) {
71                 struct lsd_client_data *lcd;
72
73                 /* XXX this does not support mulitple transactions yet, i.e.
74                  * only 1 update RPC each time betwee MDTs */
75                 lcd = req->rq_export->exp_target_data.ted_lcd;
76
77                 req->rq_transno = lcd->lcd_last_transno;
78                 req->rq_status = lcd->lcd_last_result;
79                 if (req->rq_status != 0)
80                         req->rq_transno = 0;
81                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
82                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
83
84                 DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"status %d",
85                           req->rq_transno, req->rq_status);
86
87                 reconstruct(env, dt, obj, reply, index);
88                 return 1;
89         }
90         DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
91                  req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
92         return 0;
93 }
94
95 static int out_create(struct tgt_session_info *tsi)
96 {
97         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
98         struct object_update    *update = tti->tti_u.update.tti_update;
99         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
100         struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
101         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
102         struct lu_attr          *attr = &tti->tti_attr;
103         struct lu_fid           *fid = NULL;
104         struct obdo             *wobdo;
105         size_t                  size;
106         int                     rc;
107
108         ENTRY;
109
110         wobdo = object_update_param_get(update, 0, &size);
111         if (IS_ERR(wobdo) || size != sizeof(*wobdo)) {
112                 CERROR("%s: obdo is NULL, invalid RPC: rc = %ld\n",
113                        tgt_name(tsi->tsi_tgt), PTR_ERR(wobdo));
114                 RETURN(err_serious(PTR_ERR(wobdo)));
115         }
116
117         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
118                 lustre_swab_obdo(wobdo);
119         lustre_get_wire_obdo(NULL, lobdo, wobdo);
120         la_from_obdo(attr, lobdo, lobdo->o_valid);
121
122         dof->dof_type = dt_mode_to_dft(attr->la_mode);
123         if (update->ou_params_count > 1) {
124                 fid = object_update_param_get(update, 1, &size);
125                 if (IS_ERR(fid) || size != sizeof(*fid)) {
126                         CERROR("%s: invalid fid: rc = %ld\n",
127                                tgt_name(tsi->tsi_tgt), PTR_ERR(fid));
128                         RETURN(err_serious(PTR_ERR(fid)));
129                 }
130                 if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
131                         lustre_swab_lu_fid(fid);
132                 if (!fid_is_sane(fid)) {
133                         CERROR("%s: invalid fid "DFID": rc = %d\n",
134                                tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
135                         RETURN(err_serious(-EPROTO));
136                 }
137         }
138
139         if (lu_object_exists(&obj->do_lu))
140                 RETURN(-EEXIST);
141
142         rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof,
143                            &tti->tti_tea, tti->tti_tea.ta_handle,
144                            tti->tti_u.update.tti_update_reply,
145                            tti->tti_u.update.tti_update_reply_index);
146
147         RETURN(rc);
148 }
149
150 static int out_attr_set(struct tgt_session_info *tsi)
151 {
152         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
153         struct object_update    *update = tti->tti_u.update.tti_update;
154         struct lu_attr          *attr = &tti->tti_attr;
155         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
156         struct obdo             *lobdo = &tti->tti_u.update.tti_obdo;
157         struct obdo             *wobdo;
158         size_t                   size;
159         int                      rc;
160
161         ENTRY;
162
163         wobdo = object_update_param_get(update, 0, &size);
164         if (IS_ERR(wobdo) || size != sizeof(*wobdo)) {
165                 CERROR("%s: empty obdo in the update: rc = %ld\n",
166                        tgt_name(tsi->tsi_tgt), PTR_ERR(wobdo));
167                 RETURN(err_serious(PTR_ERR(wobdo)));
168         }
169
170         attr->la_valid = 0;
171         attr->la_valid = 0;
172
173         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
174                 lustre_swab_obdo(wobdo);
175         lustre_get_wire_obdo(NULL, lobdo, wobdo);
176         la_from_obdo(attr, lobdo, lobdo->o_valid);
177
178         rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea,
179                              tti->tti_tea.ta_handle,
180                              tti->tti_u.update.tti_update_reply,
181                              tti->tti_u.update.tti_update_reply_index);
182
183         RETURN(rc);
184 }
185
186 static int out_attr_get(struct tgt_session_info *tsi)
187 {
188         const struct lu_env     *env = tsi->tsi_env;
189         struct tgt_thread_info  *tti = tgt_th_info(env);
190         struct object_update    *update = tti->tti_u.update.tti_update;
191         struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
192         struct lu_attr          *la = &tti->tti_attr;
193         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
194         int                     idx = tti->tti_u.update.tti_update_reply_index;
195         int                     rc;
196
197         ENTRY;
198
199         if (unlikely(update->ou_result_size < sizeof(*obdo)))
200                 return -EPROTO;
201
202         if (!lu_object_exists(&obj->do_lu)) {
203                 /* Usually, this will be called when the master MDT try
204                  * to init a remote object(see osp_object_init), so if
205                  * the object does not exist on slave, we need set BANSHEE flag,
206                  * so the object can be removed from the cache immediately */
207                 set_bit(LU_OBJECT_HEARD_BANSHEE,
208                         &obj->do_lu.lo_header->loh_flags);
209                 RETURN(-ENOENT);
210         }
211
212         dt_read_lock(env, obj, MOR_TGT_CHILD);
213         rc = dt_attr_get(env, obj, la);
214         if (rc)
215                 GOTO(out_unlock, rc);
216
217         obdo->o_valid = 0;
218         obdo_from_la(obdo, la, la->la_valid);
219         lustre_set_wire_obdo(NULL, obdo, obdo);
220
221 out_unlock:
222         dt_read_unlock(env, obj);
223
224         CDEBUG(D_INFO, "%s: insert attr get reply %p index %d: rc = %d\n",
225                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
226                0, rc);
227
228         object_update_result_insert(tti->tti_u.update.tti_update_reply, obdo,
229                                     sizeof(*obdo), idx, rc);
230
231         RETURN(rc);
232 }
233
234 static int out_xattr_get(struct tgt_session_info *tsi)
235 {
236         const struct lu_env        *env = tsi->tsi_env;
237         struct tgt_thread_info     *tti = tgt_th_info(env);
238         struct object_update       *update = tti->tti_u.update.tti_update;
239         struct lu_buf              *lbuf = &tti->tti_buf;
240         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
241         struct dt_object           *obj = tti->tti_u.update.tti_dt_object;
242         char                       *name;
243         struct object_update_result *update_result;
244         int                     idx = tti->tti_u.update.tti_update_reply_index;
245         int                        rc;
246
247         ENTRY;
248
249         if (!lu_object_exists(&obj->do_lu)) {
250                 set_bit(LU_OBJECT_HEARD_BANSHEE,
251                         &obj->do_lu.lo_header->loh_flags);
252                 RETURN(-ENOENT);
253         }
254
255         name = object_update_param_get(update, 0, NULL);
256         if (IS_ERR(name)) {
257                 CERROR("%s: empty name for xattr get: rc = %ld\n",
258                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
259                 RETURN(err_serious(PTR_ERR(name)));
260         }
261
262         update_result = object_update_result_get(reply, 0, NULL);
263         if (update_result == NULL) {
264                 CERROR("%s: empty name for xattr get: rc = %d\n",
265                        tgt_name(tsi->tsi_tgt), -EPROTO);
266                 RETURN(err_serious(-EPROTO));
267         }
268
269         lbuf->lb_len = (int)tti->tti_u.update.tti_update->ou_result_size;
270         lbuf->lb_buf = update_result->our_data;
271         if (lbuf->lb_len == 0)
272                 lbuf->lb_buf = 0;
273         dt_read_lock(env, obj, MOR_TGT_CHILD);
274         rc = dt_xattr_get(env, obj, lbuf, name);
275         dt_read_unlock(env, obj);
276         if (rc < 0)
277                 lbuf->lb_len = 0;
278         CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
279                tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
280                name, (int)lbuf->lb_len);
281
282         GOTO(out, rc);
283
284 out:
285         object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
286         RETURN(0);
287 }
288
289 static int out_index_lookup(struct tgt_session_info *tsi)
290 {
291         const struct lu_env     *env = tsi->tsi_env;
292         struct tgt_thread_info  *tti = tgt_th_info(env);
293         struct object_update    *update = tti->tti_u.update.tti_update;
294         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
295         char                    *name;
296         int                      rc;
297
298         ENTRY;
299
300         if (unlikely(update->ou_result_size < sizeof(tti->tti_fid1)))
301                 return -EPROTO;
302
303         if (!lu_object_exists(&obj->do_lu))
304                 RETURN(-ENOENT);
305
306         name = object_update_param_get(update, 0, NULL);
307         if (IS_ERR(name)) {
308                 CERROR("%s: empty name for lookup: rc = %ld\n",
309                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
310                 RETURN(err_serious(PTR_ERR(name)));
311         }
312
313         dt_read_lock(env, obj, MOR_TGT_CHILD);
314         if (!dt_try_as_dir(env, obj))
315                 GOTO(out_unlock, rc = -ENOTDIR);
316
317         rc = dt_lookup(env, obj, (struct dt_rec *)&tti->tti_fid1,
318                        (struct dt_key *)name);
319
320         if (rc < 0)
321                 GOTO(out_unlock, rc);
322
323         if (rc == 0)
324                 rc += 1;
325
326 out_unlock:
327         dt_read_unlock(env, obj);
328
329         CDEBUG(D_INFO, "lookup "DFID" %s get "DFID" rc %d\n",
330                PFID(lu_object_fid(&obj->do_lu)), name,
331                PFID(&tti->tti_fid1), rc);
332
333         CDEBUG(D_INFO, "%s: insert lookup reply %p index %d: rc = %d\n",
334                tgt_name(tsi->tsi_tgt), tti->tti_u.update.tti_update_reply,
335                0, rc);
336
337         object_update_result_insert(tti->tti_u.update.tti_update_reply,
338                             &tti->tti_fid1, sizeof(tti->tti_fid1),
339                             tti->tti_u.update.tti_update_reply_index, rc);
340         RETURN(rc);
341 }
342
343 static int out_xattr_set(struct tgt_session_info *tsi)
344 {
345         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
346         struct object_update    *update = tti->tti_u.update.tti_update;
347         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
348         struct lu_buf           *lbuf = &tti->tti_buf;
349         char                    *name;
350         char                    *buf;
351         __u32                   *tmp;
352         size_t                   buf_len = 0;
353         int                      flag;
354         size_t                   size = 0;
355         int                      rc;
356         ENTRY;
357
358         name = object_update_param_get(update, 0, NULL);
359         if (IS_ERR(name)) {
360                 CERROR("%s: empty name for xattr set: rc = %ld\n",
361                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
362                 RETURN(err_serious(PTR_ERR(name)));
363         }
364
365         /* If buffer == NULL (-ENODATA), then it might mean delete xattr */
366         buf = object_update_param_get(update, 1, &buf_len);
367         if (IS_ERR(buf) && PTR_ERR(buf) != -ENODATA)
368                 RETURN(err_serious(PTR_ERR(buf)));
369
370         lbuf->lb_buf = buf;
371         lbuf->lb_len = buf_len;
372
373         tmp = object_update_param_get(update, 2, &size);
374         if (IS_ERR(tmp) || size != sizeof(*tmp)) {
375                 CERROR("%s: emptry or wrong size %zu flag: rc = %ld\n",
376                        tgt_name(tsi->tsi_tgt), size, PTR_ERR(tmp));
377                 RETURN(err_serious(PTR_ERR(tmp)));
378         }
379
380         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
381                 __swab32s(tmp);
382         flag = *tmp;
383
384         rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag,
385                               &tti->tti_tea, tti->tti_tea.ta_handle,
386                               tti->tti_u.update.tti_update_reply,
387                               tti->tti_u.update.tti_update_reply_index);
388         RETURN(rc);
389 }
390
391 static int out_xattr_del(struct tgt_session_info *tsi)
392 {
393         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
394         struct object_update    *update = tti->tti_u.update.tti_update;
395         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
396         char                    *name;
397         int                      rc;
398         ENTRY;
399
400         name = object_update_param_get(update, 0, NULL);
401         if (IS_ERR(name)) {
402                 CERROR("%s: empty name for xattr set: rc = %ld\n",
403                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
404                 RETURN(err_serious(PTR_ERR(name)));
405         }
406
407         rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea,
408                               tti->tti_tea.ta_handle,
409                               tti->tti_u.update.tti_update_reply,
410                               tti->tti_u.update.tti_update_reply_index);
411         RETURN(rc);
412 }
413
414 /**
415  * increase ref of the object
416  **/
417 static int out_ref_add(struct tgt_session_info *tsi)
418 {
419         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
420         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
421         int                      rc;
422
423         ENTRY;
424
425         rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea,
426                             tti->tti_tea.ta_handle,
427                             tti->tti_u.update.tti_update_reply,
428                             tti->tti_u.update.tti_update_reply_index);
429         RETURN(rc);
430 }
431
432 static int out_ref_del(struct tgt_session_info *tsi)
433 {
434         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
435         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
436         int                      rc;
437
438         ENTRY;
439
440         if (!lu_object_exists(&obj->do_lu))
441                 RETURN(-ENOENT);
442
443         rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea,
444                             tti->tti_tea.ta_handle,
445                             tti->tti_u.update.tti_update_reply,
446                             tti->tti_u.update.tti_update_reply_index);
447         RETURN(rc);
448 }
449
450 static int out_index_insert(struct tgt_session_info *tsi)
451 {
452         struct tgt_thread_info  *tti    = tgt_th_info(tsi->tsi_env);
453         struct object_update    *update = tti->tti_u.update.tti_update;
454         struct dt_object        *obj    = tti->tti_u.update.tti_dt_object;
455         struct dt_insert_rec    *rec    = &tti->tti_rec;
456         struct lu_fid           *fid;
457         char                    *name;
458         __u32                   *ptype;
459         int                      rc     = 0;
460         size_t                   size;
461         ENTRY;
462
463         name = object_update_param_get(update, 0, NULL);
464         if (IS_ERR(name)) {
465                 CERROR("%s: empty name for index insert: rc = %ld\n",
466                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
467                 RETURN(err_serious(PTR_ERR(name)));
468         }
469
470         fid = object_update_param_get(update, 1, &size);
471         if (IS_ERR(fid) || size != sizeof(*fid)) {
472                 CERROR("%s: invalid fid: rc = %ld\n",
473                        tgt_name(tsi->tsi_tgt), PTR_ERR(fid));
474                 RETURN(err_serious(PTR_ERR(fid)));
475         }
476
477         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
478                 lustre_swab_lu_fid(fid);
479
480         if (!fid_is_sane(fid)) {
481                 CERROR("%s: invalid FID "DFID": rc = %d\n",
482                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
483                 RETURN(err_serious(-EPROTO));
484         }
485
486         ptype = object_update_param_get(update, 2, &size);
487         if (IS_ERR(ptype) || size != sizeof(*ptype)) {
488                 CERROR("%s: invalid type for index insert: rc = %ld\n",
489                        tgt_name(tsi->tsi_tgt), PTR_ERR(ptype));
490                 RETURN(err_serious(PTR_ERR(ptype)));
491         }
492
493         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
494                 __swab32s(ptype);
495
496         rec->rec_fid = fid;
497         rec->rec_type = *ptype;
498
499         rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec,
500                                  (const struct dt_key *)name, &tti->tti_tea,
501                                  tti->tti_tea.ta_handle,
502                                  tti->tti_u.update.tti_update_reply,
503                                  tti->tti_u.update.tti_update_reply_index);
504         RETURN(rc);
505 }
506
507 static int out_index_delete(struct tgt_session_info *tsi)
508 {
509         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
510         struct object_update    *update = tti->tti_u.update.tti_update;
511         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
512         char                    *name;
513         int                      rc = 0;
514
515         if (!lu_object_exists(&obj->do_lu))
516                 RETURN(-ENOENT);
517
518         name = object_update_param_get(update, 0, NULL);
519         if (IS_ERR(name)) {
520                 CERROR("%s: empty name for index delete: rc = %ld\n",
521                        tgt_name(tsi->tsi_tgt), PTR_ERR(name));
522                 RETURN(err_serious(PTR_ERR(name)));
523         }
524
525         rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name,
526                                  &tti->tti_tea, tti->tti_tea.ta_handle,
527                                  tti->tti_u.update.tti_update_reply,
528                                  tti->tti_u.update.tti_update_reply_index);
529         RETURN(rc);
530 }
531
532 static int out_destroy(struct tgt_session_info *tsi)
533 {
534         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
535         struct object_update    *update = tti->tti_u.update.tti_update;
536         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
537         struct lu_fid           *fid;
538         int                      rc;
539         ENTRY;
540
541         fid = &update->ou_fid;
542         if (!fid_is_sane(fid)) {
543                 CERROR("%s: invalid FID "DFID": rc = %d\n",
544                        tgt_name(tsi->tsi_tgt), PFID(fid), -EPROTO);
545                 RETURN(err_serious(-EPROTO));
546         }
547
548         if (!lu_object_exists(&obj->do_lu))
549                 RETURN(-ENOENT);
550
551         rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea,
552                             tti->tti_tea.ta_handle,
553                             tti->tti_u.update.tti_update_reply,
554                             tti->tti_u.update.tti_update_reply_index);
555
556         RETURN(rc);
557 }
558
559 static int out_write(struct tgt_session_info *tsi)
560 {
561         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
562         struct object_update    *update = tti->tti_u.update.tti_update;
563         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
564         struct lu_buf           *lbuf = &tti->tti_buf;
565         char                    *buf;
566         __u64                   *tmp;
567         size_t                  size = 0;
568         size_t                  buf_len = 0;
569         loff_t                  pos;
570         int                      rc;
571         ENTRY;
572
573         buf = object_update_param_get(update, 0, &buf_len);
574         if (IS_ERR(buf) || buf_len == 0) {
575                 CERROR("%s: empty buf for xattr set: rc = %ld\n",
576                        tgt_name(tsi->tsi_tgt), PTR_ERR(buf));
577                 RETURN(err_serious(PTR_ERR(buf)));
578         }
579         lbuf->lb_buf = buf;
580         lbuf->lb_len = buf_len;
581
582         tmp = object_update_param_get(update, 1, &size);
583         if (IS_ERR(tmp) || size != sizeof(*tmp)) {
584                 CERROR("%s: empty or wrong size %zu pos: rc = %ld\n",
585                        tgt_name(tsi->tsi_tgt), size, PTR_ERR(tmp));
586                 RETURN(err_serious(PTR_ERR(tmp)));
587         }
588
589         if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
590                 __swab64s(tmp);
591         pos = *tmp;
592
593         rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
594                           &tti->tti_tea, tti->tti_tea.ta_handle,
595                           tti->tti_u.update.tti_update_reply,
596                           tti->tti_u.update.tti_update_reply_index);
597         RETURN(rc);
598 }
599
600 static int out_read(struct tgt_session_info *tsi)
601 {
602         const struct lu_env     *env = tsi->tsi_env;
603         struct tgt_thread_info  *tti = tgt_th_info(env);
604         struct object_update    *update = tti->tti_u.update.tti_update;
605         struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
606         struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
607         int index = tti->tti_u.update.tti_update_reply_index;
608         struct lu_rdbuf *rdbuf;
609         struct object_update_result *update_result;
610         struct out_read_reply   *orr;
611         void *tmp;
612         size_t size;
613         size_t total_size = 0;
614         __u64 pos;
615         unsigned int i;
616         unsigned int nbufs;
617         int rc = 0;
618         ENTRY;
619
620         update_result = object_update_result_get(reply, index, NULL);
621         LASSERT(update_result != NULL);
622         update_result->our_datalen = sizeof(*orr);
623
624         if (!lu_object_exists(&obj->do_lu))
625                 GOTO(out, rc = -ENOENT);
626
627         tmp = object_update_param_get(update, 0, NULL);
628         if (IS_ERR(tmp)) {
629                 CERROR("%s: empty size for read: rc = %ld\n",
630                        tgt_name(tsi->tsi_tgt), PTR_ERR(tmp));
631                 GOTO(out, rc = err_serious(PTR_ERR(tmp)));
632         }
633         size = le64_to_cpu(*(size_t *)(tmp));
634
635         tmp = object_update_param_get(update, 1, NULL);
636         if (IS_ERR(tmp)) {
637                 CERROR("%s: empty pos for read: rc = %ld\n",
638                        tgt_name(tsi->tsi_tgt), PTR_ERR(tmp));
639                 GOTO(out, rc = err_serious(PTR_ERR(tmp)));
640         }
641         pos = le64_to_cpu(*(__u64 *)(tmp));
642
643         /* Put the offset into the begining of the buffer in reply */
644         orr = (struct out_read_reply *)update_result->our_data;
645
646         nbufs = (size + OUT_BULK_BUFFER_SIZE - 1) / OUT_BULK_BUFFER_SIZE;
647         OBD_ALLOC(rdbuf, sizeof(struct lu_rdbuf) +
648                          nbufs * sizeof(rdbuf->rb_bufs[0]));
649         if (rdbuf == NULL)
650                 GOTO(out, rc = -ENOMEM);
651
652         rdbuf->rb_nbufs = 0;
653         total_size = 0;
654         for (i = 0; i < nbufs; i++) {
655                 __u32 read_size;
656
657                 read_size = size > OUT_BULK_BUFFER_SIZE ?
658                             OUT_BULK_BUFFER_SIZE : size;
659                 OBD_ALLOC(rdbuf->rb_bufs[i].lb_buf, read_size);
660                 if (rdbuf->rb_bufs[i].lb_buf == NULL)
661                         GOTO(out_free, rc = -ENOMEM);
662
663                 rdbuf->rb_bufs[i].lb_len = read_size;
664                 dt_read_lock(env, obj, MOR_TGT_CHILD);
665                 rc = dt_read(env, obj, &rdbuf->rb_bufs[i], &pos);
666                 dt_read_unlock(env, obj);
667
668                 total_size += rc < 0 ? 0 : rc;
669                 if (rc <= 0)
670                         break;
671
672                 rdbuf->rb_nbufs++;
673                 size -= read_size;
674         }
675
676         /* send pages to client */
677         rc = tgt_send_buffer(tsi, rdbuf);
678         if (rc < 0)
679                 GOTO(out_free, rc);
680
681         orr->orr_size = total_size;
682         orr->orr_offset = pos;
683
684         orr_cpu_to_le(orr, orr);
685         update_result->our_datalen += orr->orr_size;
686 out_free:
687         for (i = 0; i < nbufs; i++) {
688                 if (rdbuf->rb_bufs[i].lb_buf != NULL) {
689                         OBD_FREE(rdbuf->rb_bufs[i].lb_buf,
690                                  rdbuf->rb_bufs[i].lb_len);
691                 }
692         }
693         OBD_FREE(rdbuf, sizeof(struct lu_rdbuf) +
694                         nbufs * sizeof(rdbuf->rb_bufs[0]));
695 out:
696         /* Insert read buffer */
697         update_result->our_rc = ptlrpc_status_hton(rc);
698         reply->ourp_lens[index] = cfs_size_round(update_result->our_datalen +
699                                                  sizeof(*update_result));
700         RETURN(rc);
701 }
702
703 static int out_noop(struct tgt_session_info *tsi)
704 {
705         return 0;
706 }
707
708 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
709 [opc - OUT_CREATE] = {                                  \
710         .th_name    = name,                             \
711         .th_fail_id = 0,                                \
712         .th_opc     = opc,                              \
713         .th_flags   = flags,                            \
714         .th_act     = fn,                               \
715         .th_fmt     = NULL,                             \
716         .th_version = 0,                                \
717 }
718
719 static struct tgt_handler out_update_ops[] = {
720         DEF_OUT_HNDL(OUT_CREATE, "out_create", MUTABOR | HABEO_REFERO,
721                      out_create),
722         DEF_OUT_HNDL(OUT_DESTROY, "out_create", MUTABOR | HABEO_REFERO,
723                      out_destroy),
724         DEF_OUT_HNDL(OUT_REF_ADD, "out_ref_add", MUTABOR | HABEO_REFERO,
725                      out_ref_add),
726         DEF_OUT_HNDL(OUT_REF_DEL, "out_ref_del", MUTABOR | HABEO_REFERO,
727                      out_ref_del),
728         DEF_OUT_HNDL(OUT_ATTR_SET, "out_attr_set",  MUTABOR | HABEO_REFERO,
729                      out_attr_set),
730         DEF_OUT_HNDL(OUT_ATTR_GET, "out_attr_get",  HABEO_REFERO,
731                      out_attr_get),
732         DEF_OUT_HNDL(OUT_XATTR_SET, "out_xattr_set", MUTABOR | HABEO_REFERO,
733                      out_xattr_set),
734         DEF_OUT_HNDL(OUT_XATTR_DEL, "out_xattr_del", MUTABOR | HABEO_REFERO,
735                      out_xattr_del),
736         DEF_OUT_HNDL(OUT_XATTR_GET, "out_xattr_get", HABEO_REFERO,
737                      out_xattr_get),
738         DEF_OUT_HNDL(OUT_INDEX_LOOKUP, "out_index_lookup", HABEO_REFERO,
739                      out_index_lookup),
740         DEF_OUT_HNDL(OUT_INDEX_INSERT, "out_index_insert",
741                      MUTABOR | HABEO_REFERO, out_index_insert),
742         DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
743                      MUTABOR | HABEO_REFERO, out_index_delete),
744         DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
745         DEF_OUT_HNDL(OUT_READ, "out_read", HABEO_REFERO, out_read),
746         DEF_OUT_HNDL(OUT_NOOP, "out_noop", HABEO_REFERO, out_noop),
747 };
748
749 static struct tgt_handler *out_handler_find(__u32 opc)
750 {
751         struct tgt_handler *h;
752
753         h = NULL;
754         if (OUT_CREATE <= opc && opc < OUT_LAST) {
755                 h = &out_update_ops[opc - OUT_CREATE];
756                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
757                          h->th_opc, opc);
758         } else {
759                 h = NULL; /* unsupported opc */
760         }
761         return h;
762 }
763
764 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
765                         struct thandle_exec_args *ta, struct obd_export *exp)
766 {
767         ta->ta_argno = 0;
768         ta->ta_handle = dt_trans_create(env, dt);
769         if (IS_ERR(ta->ta_handle)) {
770                 int rc;
771
772                 rc = PTR_ERR(ta->ta_handle);
773                 ta->ta_handle = NULL;
774                 CERROR("%s: start handle error: rc = %d\n", dt_obd_name(dt),
775                        rc);
776                 return rc;
777         }
778         if (exp->exp_need_sync)
779                 ta->ta_handle->th_sync = 1;
780
781         return 0;
782 }
783
784 static int out_trans_start(const struct lu_env *env,
785                            struct thandle_exec_args *ta)
786 {
787         return dt_trans_start(env, ta->ta_handle->th_dev, ta->ta_handle);
788 }
789
790 static int out_trans_stop(const struct lu_env *env,
791                           struct thandle_exec_args *ta, int err)
792 {
793         int i;
794         int rc;
795
796         ta->ta_handle->th_result = err;
797         rc = dt_trans_stop(env, ta->ta_handle->th_dev, ta->ta_handle);
798         for (i = 0; i < ta->ta_argno; i++) {
799                 if (ta->ta_args[i]->object != NULL) {
800                         struct dt_object *obj = ta->ta_args[i]->object;
801
802                         /* If the object is being created during this
803                          * transaction, we need to remove them from the
804                          * cache immediately, because a few layers are
805                          * missing in OUT handler, i.e. the object might
806                          * not be initialized in all layers */
807                         if (ta->ta_args[i]->exec_fn == out_tx_create_exec)
808                                 set_bit(LU_OBJECT_HEARD_BANSHEE,
809                                         &obj->do_lu.lo_header->loh_flags);
810                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
811                         ta->ta_args[i]->object = NULL;
812                 }
813         }
814         ta->ta_handle = NULL;
815         ta->ta_argno = 0;
816
817         return rc;
818 }
819
820 static int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
821                       int declare_ret)
822 {
823         struct tgt_session_info *tsi = tgt_ses_info(env);
824         int                     i;
825         int                     rc;
826         int                     rc1;
827         ENTRY;
828
829         if (ta->ta_handle == NULL)
830                 RETURN(0);
831
832         if (declare_ret != 0 || ta->ta_argno == 0)
833                 GOTO(stop, rc = declare_ret);
834
835         LASSERT(ta->ta_handle->th_dev != NULL);
836         rc = out_trans_start(env, ta);
837         if (unlikely(rc != 0))
838                 GOTO(stop, rc);
839
840         for (i = 0; i < ta->ta_argno; i++) {
841                 rc = ta->ta_args[i]->exec_fn(env, ta->ta_handle,
842                                              ta->ta_args[i]);
843                 if (unlikely(rc != 0)) {
844                         CDEBUG(D_INFO, "error during execution of #%u from"
845                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
846                                ta->ta_args[i]->line, rc);
847                         while (--i >= 0) {
848                                 if (ta->ta_args[i]->undo_fn != NULL)
849                                         ta->ta_args[i]->undo_fn(env,
850                                                                ta->ta_handle,
851                                                                ta->ta_args[i]);
852                                 else
853                                         CERROR("%s: undo for %s:%d: rc = %d\n",
854                                              dt_obd_name(ta->ta_handle->th_dev),
855                                                ta->ta_args[i]->file,
856                                                ta->ta_args[i]->line, -ENOTSUPP);
857                         }
858                         break;
859                 }
860                 CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
861                        dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
862         }
863
864         /* Only fail for real updates, XXX right now llog updates will be
865         * ignore, whose updates count is usually 1, so failover test
866         * case will spot this FAIL_UPDATE_NET_REP precisely, and it will
867         * be removed after async update patch is landed. */
868         if (ta->ta_argno > 1)
869                 tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
870
871 stop:
872         rc1 = out_trans_stop(env, ta, rc);
873         if (rc == 0)
874                 rc = rc1;
875
876         ta->ta_handle = NULL;
877         ta->ta_argno = 0;
878
879         RETURN(rc);
880 }
881
882 /**
883  * Object updates between Targets. Because all the updates has been
884  * dis-assemblied into object updates at sender side, so OUT will
885  * call OSD API directly to execute these updates.
886  *
887  * In DNE phase I all of the updates in the request need to be executed
888  * in one transaction, and the transaction has to be synchronously.
889  *
890  * Please refer to lustre/include/lustre/lustre_idl.h for req/reply
891  * format.
892  */
893 int out_handle(struct tgt_session_info *tsi)
894 {
895         const struct lu_env             *env = tsi->tsi_env;
896         struct tgt_thread_info          *tti = tgt_th_info(env);
897         struct thandle_exec_args        *ta = &tti->tti_tea;
898         struct req_capsule              *pill = tsi->tsi_pill;
899         struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
900         struct out_update_header        *ouh;
901         struct out_update_buffer        *oub = NULL;
902         struct object_update            *update;
903         struct object_update_reply      *reply;
904         struct ptlrpc_bulk_desc         *desc = NULL;
905         struct l_wait_info              lwi;
906         void                            **update_bufs;
907         int                             current_batchid = -1;
908         __u32                           update_buf_count;
909         unsigned int                    i;
910         unsigned int                    reply_index = 0;
911         int                             rc = 0;
912         int                             rc1 = 0;
913         int                             ouh_size, reply_size;
914         int                             updates;
915         ENTRY;
916
917         req_capsule_set(pill, &RQF_OUT_UPDATE);
918         ouh_size = req_capsule_get_size(pill, &RMF_OUT_UPDATE_HEADER,
919                                         RCL_CLIENT);
920         if (ouh_size <= 0)
921                 RETURN(err_serious(-EPROTO));
922
923         ouh = req_capsule_client_get(pill, &RMF_OUT_UPDATE_HEADER);
924         if (ouh == NULL)
925                 RETURN(err_serious(-EPROTO));
926
927         if (ouh->ouh_magic != OUT_UPDATE_HEADER_MAGIC) {
928                 CERROR("%s: invalid update buffer magic %x expect %x: "
929                        "rc = %d\n", tgt_name(tsi->tsi_tgt), ouh->ouh_magic,
930                        UPDATE_REQUEST_MAGIC, -EPROTO);
931                 RETURN(err_serious(-EPROTO));
932         }
933
934         update_buf_count = ouh->ouh_count;
935         if (update_buf_count == 0)
936                 RETURN(err_serious(-EPROTO));
937
938         OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count);
939         if (update_bufs == NULL)
940                 RETURN(-ENOMEM);
941
942         if (ouh->ouh_inline_length > 0) {
943                 update_bufs[0] = ouh->ouh_inline_data;
944         } else {
945                 struct out_update_buffer *tmp;
946
947                 oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
948                 if (oub == NULL)
949                         GOTO(out_free, rc = err_serious(-EPROTO));
950
951                 desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
952                                             PTLRPC_BULK_OPS_COUNT,
953                                             PTLRPC_BULK_GET_SINK |
954                                             PTLRPC_BULK_BUF_KVEC,
955                                             MDS_BULK_PORTAL,
956                                             &ptlrpc_bulk_kvec_ops);
957                 if (desc == NULL)
958                         GOTO(out_free, rc = -ENOMEM);
959
960                 tmp = oub;
961                 for (i = 0; i < update_buf_count; i++, tmp++) {
962                         if (tmp->oub_size >= OUT_MAXREQSIZE)
963                                 GOTO(out_free, rc = err_serious(-EPROTO));
964
965                         OBD_ALLOC(update_bufs[i], tmp->oub_size);
966                         if (update_bufs[i] == NULL)
967                                 GOTO(out_free, rc = -ENOMEM);
968
969                         desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
970                                                         tmp->oub_size);
971                 }
972
973                 pill->rc_req->rq_bulk_write = 1;
974                 rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
975                 if (rc != 0)
976                         GOTO(out_free, err_serious(rc));
977
978                 rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
979                 if (rc < 0)
980                         GOTO(out_free, err_serious(rc));
981         }
982         /* validate the request and calculate the total update count and
983          * set it to reply */
984         reply_size = 0;
985         updates = 0;
986         for (i = 0; i < update_buf_count; i++) {
987                 struct object_update_request    *our;
988                 int                              j;
989
990                 our = update_bufs[i];
991                 if (ptlrpc_req_need_swab(pill->rc_req))
992                         lustre_swab_object_update_request(our);
993
994                 if (our->ourq_magic != UPDATE_REQUEST_MAGIC) {
995                         CERROR("%s: invalid update buffer magic %x"
996                                " expect %x: rc = %d\n",
997                                tgt_name(tsi->tsi_tgt), our->ourq_magic,
998                                UPDATE_REQUEST_MAGIC, -EPROTO);
999                         GOTO(out_free, rc = -EPROTO);
1000                 }
1001                 updates += our->ourq_count;
1002
1003                 /* need to calculate reply size */
1004                 for (j = 0; j < our->ourq_count; j++) {
1005                         update = object_update_request_get(our, j, NULL);
1006                         if (update == NULL)
1007                                 GOTO(out, rc = -EPROTO);
1008                         if (ptlrpc_req_need_swab(pill->rc_req))
1009                                 lustre_swab_object_update(update);
1010
1011                         if (!fid_is_sane(&update->ou_fid)) {
1012                                 CERROR("%s: invalid FID "DFID": rc = %d\n",
1013                                        tgt_name(tsi->tsi_tgt),
1014                                        PFID(&update->ou_fid), -EPROTO);
1015                                 GOTO(out, rc = err_serious(-EPROTO));
1016                         }
1017
1018                         /* XXX: what ou_result_size can be considered safe? */
1019
1020                         reply_size += sizeof(reply->ourp_lens[0]);
1021                         reply_size += sizeof(struct object_update_result);
1022                         reply_size += update->ou_result_size;
1023                 }
1024         }
1025         reply_size += sizeof(*reply);
1026
1027         if (unlikely(reply_size > ouh->ouh_reply_size)) {
1028                 CERROR("%s: too small reply buf %u for %u, need %u at least\n",
1029                        tgt_name(tsi->tsi_tgt), ouh->ouh_reply_size,
1030                        updates, reply_size);
1031                 GOTO(out_free, rc = -EPROTO);
1032         }
1033
1034         req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
1035                              ouh->ouh_reply_size);
1036         rc = req_capsule_server_pack(pill);
1037         if (rc != 0) {
1038                 CERROR("%s: Can't pack response: rc = %d\n",
1039                        tgt_name(tsi->tsi_tgt), rc);
1040                 GOTO(out_free, rc = -EPROTO);
1041         }
1042
1043         /* Prepare the update reply buffer */
1044         reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
1045         if (reply == NULL)
1046                 GOTO(out_free, rc = err_serious(-EPROTO));
1047         reply->ourp_magic = UPDATE_REPLY_MAGIC;
1048         reply->ourp_count = updates;
1049         tti->tti_u.update.tti_update_reply = reply;
1050         tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
1051  
1052         /* Walk through updates in the request to execute them */
1053         for (i = 0; i < update_buf_count; i++) {
1054                 struct tgt_handler      *h;
1055                 struct dt_object        *dt_obj;
1056                 int                     update_count;
1057                 struct object_update_request *our;
1058                 int                     j;
1059
1060                 our = update_bufs[i];
1061                 update_count = our->ourq_count;
1062                 for (j = 0; j < update_count; j++) {
1063                         update = object_update_request_get(our, j, NULL);
1064
1065                         dt_obj = dt_locate(env, dt, &update->ou_fid);
1066                         if (IS_ERR(dt_obj))
1067                                 GOTO(out, rc = PTR_ERR(dt_obj));
1068
1069                         if (dt->dd_record_fid_accessed) {
1070                                 lfsck_pack_rfa(&tti->tti_lr,
1071                                                lu_object_fid(&dt_obj->do_lu),
1072                                                LE_FID_ACCESSED,
1073                                                LFSCK_TYPE_LAYOUT);
1074                                 tgt_lfsck_in_notify(env, dt, &tti->tti_lr,
1075                                                     NULL);
1076                         }
1077
1078                         tti->tti_u.update.tti_dt_object = dt_obj;
1079                         tti->tti_u.update.tti_update = update;
1080                         tti->tti_u.update.tti_update_reply_index = reply_index;
1081
1082                         h = out_handler_find(update->ou_type);
1083                         if (unlikely(h == NULL)) {
1084                                 CERROR("%s: unsupported opc: 0x%x\n",
1085                                        tgt_name(tsi->tsi_tgt), update->ou_type);
1086                                 GOTO(next, rc = -ENOTSUPP);
1087                         }
1088
1089                         /* Check resend case only for modifying RPC */
1090                         if (h->th_flags & MUTABOR) {
1091                                 struct ptlrpc_request *req = tgt_ses_req(tsi);
1092
1093                                 if (out_check_resent(env, dt, dt_obj, req,
1094                                                      out_reconstruct, reply,
1095                                                      reply_index))
1096                                         GOTO(next, rc = 0);
1097                         }
1098
1099                         /* start transaction for modification RPC only */
1100                         if (h->th_flags & MUTABOR && current_batchid == -1) {
1101                                 current_batchid = update->ou_batchid;
1102                                 rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
1103                                 if (rc != 0)
1104                                         GOTO(next, rc);
1105
1106                                 if (update->ou_flags & UPDATE_FL_SYNC)
1107                                         ta->ta_handle->th_sync = 1;
1108                         }
1109
1110                         /* Stop the current update transaction, if the update
1111                          * has different batchid, or read-only update */
1112                         if (((current_batchid != update->ou_batchid) ||
1113                              !(h->th_flags & MUTABOR)) &&
1114                              ta->ta_handle != NULL) {
1115                                 rc = out_tx_end(env, ta, rc);
1116                                 current_batchid = -1;
1117                                 if (rc != 0)
1118                                         GOTO(next, rc);
1119
1120                                 /* start a new transaction if needed */
1121                                 if (h->th_flags & MUTABOR) {
1122                                         rc = out_tx_start(env, dt, ta,
1123                                                           tsi->tsi_exp);
1124                                         if (rc != 0)
1125                                                 GOTO(next, rc);
1126                                         if (update->ou_flags & UPDATE_FL_SYNC)
1127                                                 ta->ta_handle->th_sync = 1;
1128                                         current_batchid = update->ou_batchid;
1129                                 }
1130                         }
1131
1132                         rc = h->th_act(tsi);
1133 next:
1134                         reply_index++;
1135                         lu_object_put(env, &dt_obj->do_lu);
1136                         if (rc < 0)
1137                                 GOTO(out, rc);
1138                 }
1139         }
1140 out:
1141         if (current_batchid != -1) {
1142                 rc1 = out_tx_end(env, ta, rc);
1143                 if (rc == 0)
1144                         rc = rc1;
1145         }
1146
1147 out_free:
1148         if (update_bufs != NULL) {
1149                 if (oub != NULL) {
1150                         for (i = 0; i < update_buf_count; i++, oub++) {
1151                                 if (update_bufs[i] != NULL)
1152                                         OBD_FREE(update_bufs[i], oub->oub_size);
1153                         }
1154                 }
1155
1156                 OBD_FREE(update_bufs, sizeof(*update_bufs) * update_buf_count);
1157         }
1158
1159         if (desc != NULL)
1160                 ptlrpc_free_bulk(desc);
1161
1162         RETURN(rc);
1163 }
1164
1165 struct tgt_handler tgt_out_handlers[] = {
1166 TGT_UPDATE_HDL(MUTABOR, OUT_UPDATE,     out_handle),
1167 };
1168 EXPORT_SYMBOL(tgt_out_handlers);
1169