Whamcloud - gitweb
5a837a6e89c166bb094920fdcc0d46f62a1302eb
[fs/lustre-release.git] / lustre / target / tgt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2011, 2012, Intel Corporation.
25  */
26 /*
27  * lustre/target/tgt_handler.c
28  *
29  * Lustre Unified Target request handler code
30  *
31  * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32  * Author: Mikhail Pershin <mike.pershin@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_CLASS
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <obd_cksum.h>
40 #include <md_object.h>
41
42 #include "tgt_internal.h"
43
44 char *tgt_name(struct lu_target *tgt)
45 {
46         LASSERT(tgt->lut_obd != NULL);
47         return tgt->lut_obd->obd_name;
48 }
49 EXPORT_SYMBOL(tgt_name);
50
51 /*
52  * Generic code handling requests that have struct mdt_body passed in:
53  *
54  *  - extract mdt_body from request and save it in @tsi, if present;
55  *
56  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
57  *  @tsi;
58  *
59  *  - if HABEO_CORPUS flag is set for this request type check whether object
60  *  actually exists on storage (lu_object_exists()).
61  *
62  */
63 static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
64 {
65         const struct mdt_body   *body;
66         struct lu_object        *obj;
67         struct req_capsule      *pill = tsi->tsi_pill;
68         int                      rc;
69
70         ENTRY;
71
72         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
73         if (body == NULL)
74                 RETURN(-EFAULT);
75
76         tsi->tsi_mdt_body = body;
77
78         if (!(body->valid & OBD_MD_FLID))
79                 RETURN(0);
80
81         /* mdc_pack_body() doesn't check if fid is zero and set OBD_ML_FID
82          * in any case in pre-2.5 clients. Fix that here if needed */
83         if (unlikely(fid_is_zero(&body->fid1)))
84                 RETURN(0);
85
86         if (!fid_is_sane(&body->fid1)) {
87                 CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
88                        PFID(&body->fid1));
89                 RETURN(-EINVAL);
90         }
91
92         obj = lu_object_find(tsi->tsi_env,
93                              &tsi->tsi_tgt->lut_bottom->dd_lu_dev,
94                              &body->fid1, NULL);
95         if (!IS_ERR(obj)) {
96                 if ((flags & HABEO_CORPUS) && !lu_object_exists(obj)) {
97                         lu_object_put(tsi->tsi_env, obj);
98                         /* for capability renew ENOENT will be handled in
99                          * mdt_renew_capa */
100                         if (body->valid & OBD_MD_FLOSSCAPA)
101                                 rc = 0;
102                         else
103                                 rc = -ENOENT;
104                 } else {
105                         tsi->tsi_corpus = obj;
106                         rc = 0;
107                 }
108         } else {
109                 rc = PTR_ERR(obj);
110         }
111
112         tsi->tsi_fid = body->fid1;
113
114         RETURN(rc);
115 }
116
117 /**
118  * Validate oa from client.
119  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
120  * req are valid.
121  *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
122  *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
123  *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
124  *       whether this is oi_fid or real ostid. So it will check
125  *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
126  *    c. Old FID-disable osc will send IDIF.
127  *    d. new FID-enable osc/osp will send normal FID.
128  *
129  * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
130  * be used for LAST_ID file, and only being accessed inside OST now.
131  */
132 int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa)
133 {
134         struct ost_id   *oi     = &oa->o_oi;
135         obd_seq          seq    = ostid_seq(oi);
136         obd_id           id     = ostid_id(oi);
137         int              rc;
138         ENTRY;
139
140         if (unlikely(!(exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_FID) &&
141                      fid_seq_is_echo(seq))) {
142                 /* Sigh 2.[123] client still sends echo req with oi_id = 0
143                  * during create, and we will reset this to 1, since this
144                  * oi_id is basically useless in the following create process,
145                  * but oi_id == 0 will make it difficult to tell whether it is
146                  * real FID or ost_id. */
147                 oi->oi_fid.f_seq = FID_SEQ_ECHO;
148                 oi->oi_fid.f_oid = id ?: 1;
149                 oi->oi_fid.f_ver = 0;
150         } else {
151                 struct lu_fid *fid = &tsi->tsi_fid2;
152
153                 if (unlikely((oa->o_valid & OBD_MD_FLID) && id == 0))
154                         GOTO(out, rc = -EPROTO);
155
156                 /* Note: this check might be forced in 2.5 or 2.6, i.e.
157                  * all of the requests are required to setup FLGROUP */
158                 if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
159                         ostid_set_seq_mdt0(oi);
160                         oa->o_valid |= OBD_MD_FLGROUP;
161                         seq = ostid_seq(oi);
162                 }
163
164                 if (unlikely(!(fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq) ||
165                                fid_seq_is_norm(seq) || fid_seq_is_echo(seq))))
166                         GOTO(out, rc = -EPROTO);
167
168                 rc = ostid_to_fid(fid, oi, tsi->tsi_tgt->lut_lsd.lsd_osd_index);
169                 if (unlikely(rc != 0))
170                         GOTO(out, rc);
171
172                 oi->oi_fid = *fid;
173         }
174
175         RETURN(0);
176
177 out:
178         CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
179                tgt_name(tsi->tsi_tgt), obd_export_nid2str(tsi->tsi_exp),
180                seq, id, rc);
181         return rc;
182 }
183 EXPORT_SYMBOL(tgt_validate_obdo);
184
185 static int tgt_io_data_unpack(struct tgt_session_info *tsi, struct ost_id *oi)
186 {
187         unsigned                 max_brw;
188         struct niobuf_remote    *rnb;
189         struct obd_ioobj        *ioo;
190         int                      obj_count;
191
192         ENTRY;
193
194         ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
195         if (ioo == NULL)
196                 RETURN(-EPROTO);
197
198         rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
199         if (rnb == NULL)
200                 RETURN(-EPROTO);
201
202         max_brw = ioobj_max_brw_get(ioo);
203         if (unlikely((max_brw & (max_brw - 1)) != 0)) {
204                 CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
205                        ": rc = %d\n", tgt_name(tsi->tsi_tgt),
206                        obd_export_nid2str(tsi->tsi_exp), max_brw,
207                        POSTID(oi), -EPROTO);
208                 RETURN(-EPROTO);
209         }
210         ioo->ioo_oid = *oi;
211
212         obj_count = req_capsule_get_size(tsi->tsi_pill, &RMF_OBD_IOOBJ,
213                                         RCL_CLIENT) / sizeof(*ioo);
214         if (obj_count == 0) {
215                 CERROR("%s: short ioobj\n", tgt_name(tsi->tsi_tgt));
216                 RETURN(-EPROTO);
217         } else if (obj_count > 1) {
218                 CERROR("%s: too many ioobjs (%d)\n", tgt_name(tsi->tsi_tgt),
219                        obj_count);
220                 RETURN(-EPROTO);
221         }
222
223         if (ioo->ioo_bufcnt == 0) {
224                 CERROR("%s: ioo has zero bufcnt\n", tgt_name(tsi->tsi_tgt));
225                 RETURN(-EPROTO);
226         }
227
228         if (ioo->ioo_bufcnt > PTLRPC_MAX_BRW_PAGES) {
229                 DEBUG_REQ(D_RPCTRACE, tgt_ses_req(tsi),
230                           "bulk has too many pages (%d)",
231                           ioo->ioo_bufcnt);
232                 RETURN(-EPROTO);
233         }
234
235         RETURN(0);
236 }
237
238 static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
239 {
240         struct ost_body         *body;
241         struct req_capsule      *pill = tsi->tsi_pill;
242         struct lustre_capa      *capa;
243         int                      rc;
244
245         ENTRY;
246
247         body = req_capsule_client_get(pill, &RMF_OST_BODY);
248         if (body == NULL)
249                 RETURN(-EFAULT);
250
251         rc = tgt_validate_obdo(tsi, &body->oa);
252         if (rc)
253                 RETURN(rc);
254
255         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
256                 capa = req_capsule_client_get(pill, &RMF_CAPA1);
257                 if (capa == NULL) {
258                         CERROR("%s: OSSCAPA flag is set without capability\n",
259                                tgt_name(tsi->tsi_tgt));
260                         RETURN(-EFAULT);
261                 }
262         }
263
264         tsi->tsi_ost_body = body;
265         tsi->tsi_fid = body->oa.o_oi.oi_fid;
266
267         if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
268                 rc = tgt_io_data_unpack(tsi, &body->oa.o_oi);
269                 if (rc < 0)
270                         RETURN(rc);
271         }
272
273         if (!(body->oa.o_valid & OBD_MD_FLID)) {
274                 if (flags & HABEO_CORPUS) {
275                         CERROR("%s: OBD_MD_FLID flag is not set in ost_body "
276                                "but OID/FID is mandatory with HABEO_CORPUS\n",
277                                tgt_name(tsi->tsi_tgt));
278                         RETURN(-EPROTO);
279                 } else {
280                         RETURN(0);
281                 }
282         }
283
284         ost_fid_build_resid(&tsi->tsi_fid, &tsi->tsi_resid);
285
286         /*
287          * OST doesn't get object in advance for further use to prevent
288          * situations with nested object_find which is potential deadlock.
289          */
290         tsi->tsi_corpus = NULL;
291         RETURN(rc);
292 }
293
294 /*
295  * Do necessary preprocessing according to handler ->th_flags.
296  */
297 static int tgt_request_preprocess(struct tgt_session_info *tsi,
298                                   struct tgt_handler *h,
299                                   struct ptlrpc_request *req)
300 {
301         struct req_capsule      *pill = tsi->tsi_pill;
302         __u32                    flags = h->th_flags;
303         int                      rc = 0;
304
305         ENTRY;
306
307         if (tsi->tsi_preprocessed)
308                 RETURN(0);
309
310         LASSERT(h->th_act != NULL);
311         LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
312         LASSERT(current->journal_info == NULL);
313
314         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
315                      h->th_fmt != NULL));
316         if (h->th_fmt != NULL) {
317                 req_capsule_set(pill, h->th_fmt);
318                 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
319                         rc = tgt_mdt_body_unpack(tsi, flags);
320                         if (rc < 0)
321                                 RETURN(rc);
322                 } else if (req_capsule_has_field(pill, &RMF_OST_BODY,
323                                                  RCL_CLIENT)) {
324                         rc = tgt_ost_body_unpack(tsi, flags);
325                         if (rc < 0)
326                                 RETURN(rc);
327                 }
328         }
329
330         if (flags & MUTABOR && tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
331                 RETURN(-EROFS);
332
333         if (flags & HABEO_CLAVIS) {
334                 struct ldlm_request *dlm_req;
335
336                 LASSERT(h->th_fmt != NULL);
337
338                 dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
339                 if (dlm_req != NULL) {
340                         if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
341                                      LDLM_IBITS &&
342                                      dlm_req->lock_desc.l_policy_data.\
343                                      l_inodebits.bits == 0)) {
344                                 /*
345                                  * Lock without inodebits makes no sense and
346                                  * will oops later in ldlm. If client miss to
347                                  * set such bits, do not trigger ASSERTION.
348                                  *
349                                  * For liblustre flock case, it maybe zero.
350                                  */
351                                 rc = -EPROTO;
352                         } else {
353                                 tsi->tsi_dlm_req = dlm_req;
354                         }
355                 } else {
356                         rc = -EFAULT;
357                 }
358         }
359         tsi->tsi_preprocessed = 1;
360         RETURN(rc);
361 }
362
363 /*
364  * Invoke handler for this request opc. Also do necessary preprocessing
365  * (according to handler ->th_flags), and post-processing (setting of
366  * ->last_{xid,committed}).
367  */
368 static int tgt_handle_request0(struct tgt_session_info *tsi,
369                                struct tgt_handler *h,
370                                struct ptlrpc_request *req)
371 {
372         int      serious = 0;
373         int      rc;
374
375         ENTRY;
376
377         /*
378          * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
379          * to put same checks into handlers like mdt_close(), mdt_reint(),
380          * etc., without talking to mdt authors first. Checking same thing
381          * there again is useless and returning 0 error without packing reply
382          * is buggy! Handlers either pack reply or return error.
383          *
384          * We return 0 here and do not send any reply in order to emulate
385          * network failure. Do not send any reply in case any of NET related
386          * fail_id has occured.
387          */
388         if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
389                 RETURN(0);
390
391         rc = tgt_request_preprocess(tsi, h, req);
392         /* pack reply if reply format is fixed */
393         if (rc == 0 && h->th_flags & HABEO_REFERO) {
394                 /* Pack reply */
395                 if (req_capsule_has_field(tsi->tsi_pill, &RMF_MDT_MD,
396                                           RCL_SERVER))
397                         req_capsule_set_size(tsi->tsi_pill, &RMF_MDT_MD,
398                                              RCL_SERVER,
399                                              tsi->tsi_mdt_body->eadatasize);
400                 if (req_capsule_has_field(tsi->tsi_pill, &RMF_LOGCOOKIES,
401                                           RCL_SERVER))
402                         req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
403                                              RCL_SERVER, 0);
404
405                 rc = req_capsule_server_pack(tsi->tsi_pill);
406         }
407
408         if (likely(rc == 0)) {
409                 /*
410                  * Process request, there can be two types of rc:
411                  * 1) errors with msg unpack/pack, other failures outside the
412                  * operation itself. This is counted as serious errors;
413                  * 2) errors during fs operation, should be placed in rq_status
414                  * only
415                  */
416                 rc = h->th_act(tsi);
417                 if (!is_serious(rc) &&
418                     !req->rq_no_reply && req->rq_reply_state == NULL) {
419                         DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
420                                   "pack reply and returned 0 error\n",
421                                   tgt_name(tsi->tsi_tgt), h->th_name);
422                         LBUG();
423                 }
424                 serious = is_serious(rc);
425                 rc = clear_serious(rc);
426         } else {
427                 serious = 1;
428         }
429
430         req->rq_status = rc;
431
432         /*
433          * ELDLM_* codes which > 0 should be in rq_status only as well as
434          * all non-serious errors.
435          */
436         if (rc > 0 || !serious)
437                 rc = 0;
438
439         LASSERT(current->journal_info == NULL);
440
441         if (likely(rc == 0 && req->rq_export))
442                 target_committed_to_req(req);
443
444         target_send_reply(req, rc, tsi->tsi_reply_fail_id);
445         RETURN(0);
446 }
447
448 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
449                                        struct obd_device *obd, int *process)
450 {
451         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
452         case MDS_DISCONNECT:
453         case OST_DISCONNECT:
454         case OBD_IDX_READ:
455                 *process = 1;
456                 RETURN(0);
457         case MDS_CLOSE:
458         case MDS_DONE_WRITING:
459         case MDS_SYNC: /* used in unmounting */
460         case OBD_PING:
461         case MDS_REINT:
462         case UPDATE_OBJ:
463         case SEQ_QUERY:
464         case FLD_QUERY:
465         case LDLM_ENQUEUE:
466         case OST_CREATE:
467         case OST_DESTROY:
468         case OST_PUNCH:
469         case OST_SETATTR:
470         case OST_SYNC:
471         case OST_WRITE:
472                 *process = target_queue_recovery_request(req, obd);
473                 RETURN(0);
474
475         default:
476                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
477                 *process = -EAGAIN;
478                 RETURN(0);
479         }
480 }
481
482 /*
483  * Handle recovery. Return:
484  *        +1: continue request processing;
485  *       -ve: abort immediately with the given error code;
486  *         0: send reply with error code in req->rq_status;
487  */
488 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
489 {
490         ENTRY;
491
492         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
493         case MDS_CONNECT:
494         case OST_CONNECT:
495         case MGS_CONNECT:
496         case SEC_CTX_INIT:
497         case SEC_CTX_INIT_CONT:
498         case SEC_CTX_FINI:
499                 RETURN(+1);
500         }
501
502         if (!req->rq_export->exp_obd->obd_replayable)
503                 RETURN(+1);
504
505         /* sanity check: if the xid matches, the request must be marked as a
506          * resent or replayed */
507         if (req_xid_is_last(req)) {
508                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
509                       (MSG_RESENT | MSG_REPLAY))) {
510                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
511                                   "last_xid, expected REPLAY or RESENT flag "
512                                   "(%x)", req->rq_xid,
513                                   lustre_msg_get_flags(req->rq_reqmsg));
514                         req->rq_status = -ENOTCONN;
515                         RETURN(-ENOTCONN);
516                 }
517         }
518         /* else: note the opposite is not always true; a RESENT req after a
519          * failover will usually not match the last_xid, since it was likely
520          * never committed. A REPLAYed request will almost never match the
521          * last xid, however it could for a committed, but still retained,
522          * open. */
523
524         /* Check for aborted recovery... */
525         if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
526                 int rc;
527                 int should_process;
528
529                 DEBUG_REQ(D_INFO, req, "Got new replay");
530                 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
531                                                  &should_process);
532                 if (rc != 0 || !should_process)
533                         RETURN(rc);
534                 else if (should_process < 0) {
535                         req->rq_status = should_process;
536                         rc = ptlrpc_error(req);
537                         RETURN(rc);
538                 }
539         }
540         RETURN(+1);
541 }
542
543 /* Initial check for request, it is validation mostly */
544 static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
545 {
546         struct tgt_handler      *h;
547         struct tgt_opc_slice    *s;
548         struct lu_target        *tgt;
549         __u32                    opc = lustre_msg_get_opc(req->rq_reqmsg);
550
551         ENTRY;
552
553         tgt = class_exp2tgt(req->rq_export);
554
555         for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
556                 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
557                         break;
558
559         /* opcode was not found in slice */
560         if (unlikely(s->tos_hs == NULL)) {
561                 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt),
562                        opc);
563                 RETURN(ERR_PTR(-ENOTSUPP));
564         }
565
566         LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
567         h = s->tos_hs + (opc - s->tos_opc_start);
568         if (unlikely(h->th_opc == 0)) {
569                 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
570                 RETURN(ERR_PTR(-ENOTSUPP));
571         }
572
573         RETURN(h);
574 }
575
576 int tgt_request_handle(struct ptlrpc_request *req)
577 {
578         struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
579
580         struct lustre_msg       *msg = req->rq_reqmsg;
581         struct tgt_handler      *h;
582         struct lu_target        *tgt;
583         int                      request_fail_id = 0;
584         __u32                    opc = lustre_msg_get_opc(msg);
585         int                      rc;
586
587         ENTRY;
588
589         /* Refill the context, to make sure all thread keys are allocated */
590         lu_env_refill(req->rq_svc_thread->t_env);
591
592         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
593         tsi->tsi_pill = &req->rq_pill;
594         tsi->tsi_env = req->rq_svc_thread->t_env;
595
596         /* if request has export then get handlers slice from corresponding
597          * target, otherwise that should be connect operation */
598         if (opc == MDS_CONNECT || opc == OST_CONNECT ||
599             opc == MGS_CONNECT) {
600                 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
601                 rc = target_handle_connect(req);
602                 if (rc != 0) {
603                         rc = ptlrpc_error(req);
604                         GOTO(out, rc);
605                 }
606                 /* recovery-small test 18c asks to drop connect reply */
607                 if (unlikely(opc == OST_CONNECT &&
608                              OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2)))
609                         GOTO(out, rc = 0);
610         }
611
612         if (unlikely(!class_connected_export(req->rq_export))) {
613                 CDEBUG(D_HA, "operation %d on unconnected OST from %s\n",
614                        opc, libcfs_id2str(req->rq_peer));
615                 req->rq_status = -ENOTCONN;
616                 rc = ptlrpc_error(req);
617                 GOTO(out, rc);
618         }
619
620         tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
621         tsi->tsi_exp = req->rq_export;
622         if (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS)
623                 tsi->tsi_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
624         else
625                 tsi->tsi_jobid = NULL;
626
627         request_fail_id = tgt->lut_request_fail_id;
628         tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
629
630         h = tgt_handler_find_check(req);
631         if (IS_ERR(h)) {
632                 req->rq_status = PTR_ERR(h);
633                 rc = ptlrpc_error(req);
634                 GOTO(out, rc);
635         }
636
637         if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
638                 GOTO(out, rc = 0);
639
640         rc = lustre_msg_check_version(msg, h->th_version);
641         if (unlikely(rc)) {
642                 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
643                           " %08x, expecting %08x\n", tgt_name(tgt),
644                           lustre_msg_get_version(msg), h->th_version);
645                 req->rq_status = -EINVAL;
646                 rc = ptlrpc_error(req);
647                 GOTO(out, rc);
648         }
649
650         rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
651         if (likely(rc == 1)) {
652                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
653                          h->th_opc, opc);
654                 rc = tgt_handle_request0(tsi, h, req);
655                 if (rc)
656                         GOTO(out, rc);
657         }
658         EXIT;
659 out:
660         req_capsule_fini(tsi->tsi_pill);
661         if (tsi->tsi_corpus != NULL) {
662                 lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
663                 tsi->tsi_corpus = NULL;
664         }
665         return rc;
666 }
667 EXPORT_SYMBOL(tgt_request_handle);
668
669 /** Assign high priority operations to the request if needed. */
670 int tgt_hpreq_handler(struct ptlrpc_request *req)
671 {
672         struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
673         struct tgt_handler      *h;
674         int                      rc;
675
676         ENTRY;
677
678         if (req->rq_export == NULL)
679                 RETURN(0);
680
681         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
682         tsi->tsi_pill = &req->rq_pill;
683         tsi->tsi_env = req->rq_svc_thread->t_env;
684         tsi->tsi_tgt = class_exp2tgt(req->rq_export);
685         tsi->tsi_exp = req->rq_export;
686
687         h = tgt_handler_find_check(req);
688         if (IS_ERR(h)) {
689                 rc = PTR_ERR(h);
690                 RETURN(rc);
691         }
692
693         rc = tgt_request_preprocess(tsi, h, req);
694         if (unlikely(rc != 0))
695                 RETURN(rc);
696
697         if (h->th_hp != NULL)
698                 h->th_hp(tsi);
699         RETURN(0);
700 }
701 EXPORT_SYMBOL(tgt_hpreq_handler);
702
703 void tgt_counter_incr(struct obd_export *exp, int opcode)
704 {
705         lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
706         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
707                 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
708 }
709 EXPORT_SYMBOL(tgt_counter_incr);
710
711 /*
712  * Unified target generic handlers.
713  */
714
715 /*
716  * Security functions
717  */
718 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
719 {
720         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
721                                       OBD_CONNECT_RMT_CLIENT_FORCE |
722                                       OBD_CONNECT_MDS_CAPA |
723                                       OBD_CONNECT_OSS_CAPA);
724 }
725
726 static int tgt_init_sec_level(struct ptlrpc_request *req)
727 {
728         struct lu_target        *tgt = class_exp2tgt(req->rq_export);
729         char                    *client = libcfs_nid2str(req->rq_peer.nid);
730         struct obd_connect_data *data, *reply;
731         int                      rc = 0;
732         bool                     remote;
733
734         ENTRY;
735
736         data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
737         reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
738         if (data == NULL || reply == NULL)
739                 RETURN(-EFAULT);
740
741         /* connection from MDT is always trusted */
742         if (req->rq_auth_usr_mdt) {
743                 tgt_init_sec_none(reply);
744                 RETURN(0);
745         }
746
747         /* no GSS support case */
748         if (!req->rq_auth_gss) {
749                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
750                         CWARN("client %s -> target %s does not use GSS, "
751                               "can not run under security level %d.\n",
752                               client, tgt_name(tgt), tgt->lut_sec_level);
753                         RETURN(-EACCES);
754                 } else {
755                         tgt_init_sec_none(reply);
756                         RETURN(0);
757                 }
758         }
759
760         /* old version case */
761         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
762                      !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
763                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
764                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
765                         CWARN("client %s -> target %s uses old version, "
766                               "can not run under security level %d.\n",
767                               client, tgt_name(tgt), tgt->lut_sec_level);
768                         RETURN(-EACCES);
769                 } else {
770                         CWARN("client %s -> target %s uses old version, "
771                               "run under security level %d.\n",
772                               client, tgt_name(tgt), tgt->lut_sec_level);
773                         tgt_init_sec_none(reply);
774                         RETURN(0);
775                 }
776         }
777
778         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
779         if (remote) {
780                 if (!req->rq_auth_remote)
781                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
782                                "asked to be remote.\n", client, tgt_name(tgt));
783         } else if (req->rq_auth_remote) {
784                 remote = true;
785                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
786                        "as remote by default.\n", client, tgt_name(tgt));
787         }
788
789         if (remote) {
790                 if (!tgt->lut_oss_capa) {
791                         CDEBUG(D_SEC,
792                                "client %s -> target %s is set as remote,"
793                                " but OSS capabilities are not enabled: %d.\n",
794                                client, tgt_name(tgt), tgt->lut_oss_capa);
795                         RETURN(-EACCES);
796                 }
797         } else {
798                 if (req->rq_auth_uid == INVALID_UID) {
799                         CDEBUG(D_SEC, "client %s -> target %s: user is not "
800                                "authenticated!\n", client, tgt_name(tgt));
801                         RETURN(-EACCES);
802                 }
803         }
804
805
806         switch (tgt->lut_sec_level) {
807         case LUSTRE_SEC_NONE:
808                 if (remote) {
809                         CDEBUG(D_SEC,
810                                "client %s -> target %s is set as remote, "
811                                "can not run under security level %d.\n",
812                                client, tgt_name(tgt), tgt->lut_sec_level);
813                         RETURN(-EACCES);
814                 }
815                 tgt_init_sec_none(reply);
816                 break;
817         case LUSTRE_SEC_REMOTE:
818                 if (!remote)
819                         tgt_init_sec_none(reply);
820                 break;
821         case LUSTRE_SEC_ALL:
822                 if (remote)
823                         break;
824                 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
825                                               OBD_CONNECT_RMT_CLIENT_FORCE);
826                 if (!tgt->lut_oss_capa)
827                         reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
828                 if (!tgt->lut_mds_capa)
829                         reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
830                 break;
831         default:
832                 RETURN(-EINVAL);
833         }
834
835         RETURN(rc);
836 }
837
838 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
839 {
840         struct lu_target        *tgt = class_exp2tgt(exp);
841         struct sptlrpc_flavor    flvr;
842         int                      rc = 0;
843
844         LASSERT(tgt);
845         LASSERT(tgt->lut_obd);
846         LASSERT(tgt->lut_slice);
847
848         /* always allow ECHO client */
849         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
850                             LUSTRE_ECHO_NAME) == 0)) {
851                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
852                 return 0;
853         }
854
855         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
856                 read_lock(&tgt->lut_sptlrpc_lock);
857                 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
858                                              req->rq_sp_from,
859                                              req->rq_peer.nid,
860                                              &flvr);
861                 read_unlock(&tgt->lut_sptlrpc_lock);
862
863                 spin_lock(&exp->exp_lock);
864                 exp->exp_sp_peer = req->rq_sp_from;
865                 exp->exp_flvr = flvr;
866                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
867                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
868                         CERROR("%s: unauthorized rpc flavor %x from %s, "
869                                "expect %x\n", tgt_name(tgt),
870                                req->rq_flvr.sf_rpc,
871                                libcfs_nid2str(req->rq_peer.nid),
872                                exp->exp_flvr.sf_rpc);
873                         rc = -EACCES;
874                 }
875                 spin_unlock(&exp->exp_lock);
876         } else {
877                 if (exp->exp_sp_peer != req->rq_sp_from) {
878                         CERROR("%s: RPC source %s doesn't match %s\n",
879                                tgt_name(tgt),
880                                sptlrpc_part2name(req->rq_sp_from),
881                                sptlrpc_part2name(exp->exp_sp_peer));
882                         rc = -EACCES;
883                 } else {
884                         rc = sptlrpc_target_export_check(exp, req);
885                 }
886         }
887
888         return rc;
889 }
890
891 int tgt_adapt_sptlrpc_conf(struct lu_target *tgt, int initial)
892 {
893         struct sptlrpc_rule_set  tmp_rset;
894         int                      rc;
895
896         sptlrpc_rule_set_init(&tmp_rset);
897         rc = sptlrpc_conf_target_get_rules(tgt->lut_obd, &tmp_rset, initial);
898         if (rc) {
899                 CERROR("%s: failed get sptlrpc rules: rc = %d\n",
900                        tgt_name(tgt), rc);
901                 return rc;
902         }
903
904         sptlrpc_target_update_exp_flavor(tgt->lut_obd, &tmp_rset);
905
906         write_lock(&tgt->lut_sptlrpc_lock);
907         sptlrpc_rule_set_free(&tgt->lut_sptlrpc_rset);
908         tgt->lut_sptlrpc_rset = tmp_rset;
909         write_unlock(&tgt->lut_sptlrpc_lock);
910
911         return 0;
912 }
913 EXPORT_SYMBOL(tgt_adapt_sptlrpc_conf);
914
915 int tgt_connect(struct tgt_session_info *tsi)
916 {
917         struct ptlrpc_request   *req = tgt_ses_req(tsi);
918         struct obd_connect_data *reply;
919         int                      rc;
920
921         ENTRY;
922
923         rc = tgt_init_sec_level(req);
924         if (rc != 0)
925                 GOTO(out, rc);
926
927         /* XXX: better to call this check right after getting new export but
928          * before last_rcvd slot allocation to avoid server load upon insecure
929          * connects. This is to be fixed after unifiyng all targets.
930          */
931         rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
932         if (rc)
933                 GOTO(out, rc);
934
935         /* To avoid exposing partially initialized connection flags, changes up
936          * to this point have been staged in reply->ocd_connect_flags. Now that
937          * connection handling has completed successfully, atomically update
938          * the connect flags in the shared export data structure. LU-1623 */
939         reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
940         spin_lock(&tsi->tsi_exp->exp_lock);
941         *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
942         tsi->tsi_exp->exp_connect_data.ocd_brw_size = reply->ocd_brw_size;
943         spin_unlock(&tsi->tsi_exp->exp_lock);
944
945         RETURN(0);
946 out:
947         obd_disconnect(class_export_get(tsi->tsi_exp));
948         return rc;
949 }
950 EXPORT_SYMBOL(tgt_connect);
951
952 int tgt_disconnect(struct tgt_session_info *tsi)
953 {
954         int rc;
955
956         ENTRY;
957
958         rc = target_handle_disconnect(tgt_ses_req(tsi));
959         if (rc)
960                 RETURN(err_serious(rc));
961
962         RETURN(rc);
963 }
964 EXPORT_SYMBOL(tgt_disconnect);
965
966 /*
967  * Unified target OBD handlers
968  */
969 int tgt_obd_ping(struct tgt_session_info *tsi)
970 {
971         int rc;
972
973         ENTRY;
974
975         rc = target_handle_ping(tgt_ses_req(tsi));
976         if (rc)
977                 RETURN(err_serious(rc));
978
979         RETURN(rc);
980 }
981 EXPORT_SYMBOL(tgt_obd_ping);
982
983 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
984 {
985         return err_serious(-EOPNOTSUPP);
986 }
987 EXPORT_SYMBOL(tgt_obd_log_cancel);
988
989 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
990 {
991         return err_serious(-EOPNOTSUPP);
992 }
993 EXPORT_SYMBOL(tgt_obd_qc_callback);
994
995 int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
996 {
997         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
998         struct ptlrpc_request   *req = tgt_ses_req(tsi);
999         struct obd_export       *exp = req->rq_export;
1000         struct ptlrpc_bulk_desc *desc;
1001         struct l_wait_info      *lwi = &tti->tti_u.rdpg.tti_wait_info;
1002         int                      tmpcount;
1003         int                      tmpsize;
1004         int                      i;
1005         int                      rc;
1006
1007         ENTRY;
1008
1009         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
1010                                     MDS_BULK_PORTAL);
1011         if (desc == NULL)
1012                 RETURN(-ENOMEM);
1013
1014         if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
1015                 /* old client requires reply size in it's PAGE_CACHE_SIZE,
1016                  * which is rdpg->rp_count */
1017                 nob = rdpg->rp_count;
1018
1019         for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
1020              i++, tmpcount -= tmpsize) {
1021                 tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
1022                 ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
1023         }
1024
1025         LASSERT(desc->bd_nob == nob);
1026         rc = target_bulk_io(exp, desc, lwi);
1027         ptlrpc_free_bulk_pin(desc);
1028         RETURN(rc);
1029 }
1030 EXPORT_SYMBOL(tgt_sendpage);
1031
1032 /*
1033  * OBD_IDX_READ handler
1034  */
1035 int tgt_obd_idx_read(struct tgt_session_info *tsi)
1036 {
1037         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
1038         struct lu_rdpg          *rdpg = &tti->tti_u.rdpg.tti_rdpg;
1039         struct idx_info         *req_ii, *rep_ii;
1040         int                      rc, i;
1041
1042         ENTRY;
1043
1044         memset(rdpg, 0, sizeof(*rdpg));
1045         req_capsule_set(tsi->tsi_pill, &RQF_OBD_IDX_READ);
1046
1047         /* extract idx_info buffer from request & reply */
1048         req_ii = req_capsule_client_get(tsi->tsi_pill, &RMF_IDX_INFO);
1049         if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
1050                 RETURN(err_serious(-EPROTO));
1051
1052         rc = req_capsule_server_pack(tsi->tsi_pill);
1053         if (rc)
1054                 RETURN(err_serious(rc));
1055
1056         rep_ii = req_capsule_server_get(tsi->tsi_pill, &RMF_IDX_INFO);
1057         if (rep_ii == NULL)
1058                 RETURN(err_serious(-EFAULT));
1059         rep_ii->ii_magic = IDX_INFO_MAGIC;
1060
1061         /* extract hash to start with */
1062         rdpg->rp_hash = req_ii->ii_hash_start;
1063
1064         /* extract requested attributes */
1065         rdpg->rp_attrs = req_ii->ii_attrs;
1066
1067         /* check that fid packed in request is valid and supported */
1068         if (!fid_is_sane(&req_ii->ii_fid))
1069                 RETURN(-EINVAL);
1070         rep_ii->ii_fid = req_ii->ii_fid;
1071
1072         /* copy flags */
1073         rep_ii->ii_flags = req_ii->ii_flags;
1074
1075         /* compute number of pages to allocate, ii_count is the number of 4KB
1076          * containers */
1077         if (req_ii->ii_count <= 0)
1078                 GOTO(out, rc = -EFAULT);
1079         rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
1080                                exp_max_brw_size(tsi->tsi_exp));
1081         rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE -1) >> PAGE_CACHE_SHIFT;
1082
1083         /* allocate pages to store the containers */
1084         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
1085         if (rdpg->rp_pages == NULL)
1086                 GOTO(out, rc = -ENOMEM);
1087         for (i = 0; i < rdpg->rp_npages; i++) {
1088                 rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
1089                 if (rdpg->rp_pages[i] == NULL)
1090                         GOTO(out, rc = -ENOMEM);
1091         }
1092
1093         /* populate pages with key/record pairs */
1094         rc = dt_index_read(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, rep_ii, rdpg);
1095         if (rc < 0)
1096                 GOTO(out, rc);
1097
1098         LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
1099                  "asked %d > %d\n", rc, rdpg->rp_count);
1100
1101         /* send pages to client */
1102         rc = tgt_sendpage(tsi, rdpg, rc);
1103         if (rc)
1104                 GOTO(out, rc);
1105         EXIT;
1106 out:
1107         if (rdpg->rp_pages) {
1108                 for (i = 0; i < rdpg->rp_npages; i++)
1109                         if (rdpg->rp_pages[i])
1110                                 __free_page(rdpg->rp_pages[i]);
1111                 OBD_FREE(rdpg->rp_pages,
1112                          rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
1113         }
1114         return rc;
1115 }
1116 EXPORT_SYMBOL(tgt_obd_idx_read);
1117
1118 struct tgt_handler tgt_obd_handlers[] = {
1119 TGT_OBD_HDL    (0,      OBD_PING,               tgt_obd_ping),
1120 TGT_OBD_HDL_VAR(0,      OBD_LOG_CANCEL,         tgt_obd_log_cancel),
1121 TGT_OBD_HDL_VAR(0,      OBD_QC_CALLBACK,        tgt_obd_qc_callback),
1122 TGT_OBD_HDL    (0,      OBD_IDX_READ,           tgt_obd_idx_read)
1123 };
1124 EXPORT_SYMBOL(tgt_obd_handlers);
1125
1126 int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
1127              struct dt_object *obj)
1128 {
1129         int rc = 0;
1130
1131         ENTRY;
1132
1133         /* if no objid is specified, it means "sync whole filesystem" */
1134         if (obj == NULL) {
1135                 rc = dt_sync(env, tgt->lut_bottom);
1136         } else if (dt_version_get(env, obj) >
1137                    tgt->lut_obd->obd_last_committed) {
1138                 rc = dt_object_sync(env, obj);
1139         }
1140
1141         RETURN(rc);
1142 }
1143 EXPORT_SYMBOL(tgt_sync);
1144 /*
1145  * Unified target DLM handlers.
1146  */
1147
1148 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1149  * (if requested) */
1150 int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
1151                      void *data, int flag)
1152 {
1153         struct lu_env            env;
1154         struct lu_target        *tgt;
1155         struct dt_object        *obj;
1156         struct lu_fid            fid;
1157         int                      rc = 0;
1158
1159         ENTRY;
1160
1161         tgt = class_exp2tgt(lock->l_export);
1162
1163         if (flag == LDLM_CB_CANCELING &&
1164             (lock->l_granted_mode & (LCK_PW | LCK_GROUP)) &&
1165             (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1166              (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1167               lock->l_flags & LDLM_FL_CBPENDING))) {
1168                 rc = lu_env_init(&env, LCT_DT_THREAD);
1169                 if (unlikely(rc != 0))
1170                         RETURN(rc);
1171
1172                 ost_fid_from_resid(&fid, &lock->l_resource->lr_name,
1173                                    tgt->lut_lsd.lsd_osd_index);
1174                 obj = dt_locate(&env, tgt->lut_bottom, &fid);
1175                 if (IS_ERR(obj))
1176                         GOTO(err_env, rc = PTR_ERR(obj));
1177
1178                 if (!dt_object_exists(obj))
1179                         GOTO(err_put, rc = -ENOENT);
1180
1181                 rc = tgt_sync(&env, tgt, obj);
1182                 if (rc < 0) {
1183                         CERROR("%s: syncing "DFID" ("LPU64"-"LPU64") on lock "
1184                                "cancel: rc = %d\n",
1185                                tgt_name(tgt), PFID(&fid),
1186                                lock->l_policy_data.l_extent.start,
1187                                lock->l_policy_data.l_extent.end, rc);
1188                 }
1189 err_put:
1190                 lu_object_put(&env, &obj->do_lu);
1191 err_env:
1192                 lu_env_fini(&env);
1193         }
1194
1195         rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1196         RETURN(rc);
1197 }
1198
1199 struct ldlm_callback_suite tgt_dlm_cbs = {
1200         .lcs_completion = ldlm_server_completion_ast,
1201         .lcs_blocking   = tgt_blocking_ast,
1202         .lcs_glimpse    = ldlm_server_glimpse_ast
1203 };
1204
1205 int tgt_enqueue(struct tgt_session_info *tsi)
1206 {
1207         struct ptlrpc_request *req = tgt_ses_req(tsi);
1208         int rc;
1209
1210         ENTRY;
1211         /*
1212          * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
1213          * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
1214          */
1215         LASSERT(tsi->tsi_dlm_req != NULL);
1216         rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
1217                                   tsi->tsi_dlm_req, &tgt_dlm_cbs);
1218         if (rc)
1219                 RETURN(err_serious(rc));
1220
1221         RETURN(req->rq_status);
1222 }
1223 EXPORT_SYMBOL(tgt_enqueue);
1224
1225 int tgt_convert(struct tgt_session_info *tsi)
1226 {
1227         struct ptlrpc_request *req = tgt_ses_req(tsi);
1228         int rc;
1229
1230         ENTRY;
1231         LASSERT(tsi->tsi_dlm_req);
1232         rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
1233         if (rc)
1234                 RETURN(err_serious(rc));
1235
1236         RETURN(req->rq_status);
1237 }
1238 EXPORT_SYMBOL(tgt_convert);
1239
1240 int tgt_bl_callback(struct tgt_session_info *tsi)
1241 {
1242         return err_serious(-EOPNOTSUPP);
1243 }
1244 EXPORT_SYMBOL(tgt_bl_callback);
1245
1246 int tgt_cp_callback(struct tgt_session_info *tsi)
1247 {
1248         return err_serious(-EOPNOTSUPP);
1249 }
1250 EXPORT_SYMBOL(tgt_cp_callback);
1251
1252 /* generic LDLM target handler */
1253 struct tgt_handler tgt_dlm_handlers[] = {
1254 TGT_DLM_HDL    (HABEO_CLAVIS,   LDLM_ENQUEUE,           tgt_enqueue),
1255 TGT_DLM_HDL_VAR(HABEO_CLAVIS,   LDLM_CONVERT,           tgt_convert),
1256 TGT_DLM_HDL_VAR(0,              LDLM_BL_CALLBACK,       tgt_bl_callback),
1257 TGT_DLM_HDL_VAR(0,              LDLM_CP_CALLBACK,       tgt_cp_callback)
1258 };
1259 EXPORT_SYMBOL(tgt_dlm_handlers);
1260
1261 /*
1262  * Unified target LLOG handlers.
1263  */
1264 int tgt_llog_open(struct tgt_session_info *tsi)
1265 {
1266         int rc;
1267
1268         ENTRY;
1269
1270         rc = llog_origin_handle_open(tgt_ses_req(tsi));
1271
1272         RETURN(rc);
1273 }
1274 EXPORT_SYMBOL(tgt_llog_open);
1275
1276 int tgt_llog_close(struct tgt_session_info *tsi)
1277 {
1278         int rc;
1279
1280         ENTRY;
1281
1282         rc = llog_origin_handle_close(tgt_ses_req(tsi));
1283
1284         RETURN(rc);
1285 }
1286 EXPORT_SYMBOL(tgt_llog_close);
1287
1288
1289 int tgt_llog_destroy(struct tgt_session_info *tsi)
1290 {
1291         int rc;
1292
1293         ENTRY;
1294
1295         rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
1296
1297         RETURN(rc);
1298 }
1299 EXPORT_SYMBOL(tgt_llog_destroy);
1300
1301 int tgt_llog_read_header(struct tgt_session_info *tsi)
1302 {
1303         int rc;
1304
1305         ENTRY;
1306
1307         rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
1308
1309         RETURN(rc);
1310 }
1311 EXPORT_SYMBOL(tgt_llog_read_header);
1312
1313 int tgt_llog_next_block(struct tgt_session_info *tsi)
1314 {
1315         int rc;
1316
1317         ENTRY;
1318
1319         rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
1320
1321         RETURN(rc);
1322 }
1323 EXPORT_SYMBOL(tgt_llog_next_block);
1324
1325 int tgt_llog_prev_block(struct tgt_session_info *tsi)
1326 {
1327         int rc;
1328
1329         ENTRY;
1330
1331         rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
1332
1333         RETURN(rc);
1334 }
1335 EXPORT_SYMBOL(tgt_llog_prev_block);
1336
1337 /* generic llog target handler */
1338 struct tgt_handler tgt_llog_handlers[] = {
1339 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_CREATE,      tgt_llog_open),
1340 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
1341 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
1342 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
1343 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_DESTROY,     tgt_llog_destroy),
1344 TGT_LLOG_HDL_VAR(0,     LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
1345 };
1346 EXPORT_SYMBOL(tgt_llog_handlers);
1347
1348 /*
1349  * sec context handlers
1350  */
1351 /* XXX: Implement based on mdt_sec_ctx_handle()? */
1352 int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
1353 {
1354         return 0;
1355 }
1356
1357 struct tgt_handler tgt_sec_ctx_handlers[] = {
1358 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT,           tgt_sec_ctx_handle),
1359 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT_CONT,      tgt_sec_ctx_handle),
1360 TGT_SEC_HDL_VAR(0,      SEC_CTX_FINI,           tgt_sec_ctx_handle),
1361 };
1362 EXPORT_SYMBOL(tgt_sec_ctx_handlers);
1363
1364 /*
1365  * initialize per-thread page pool (bug 5137).
1366  */
1367 int tgt_io_thread_init(struct ptlrpc_thread *thread)
1368 {
1369         struct tgt_thread_big_cache *tbc;
1370
1371         ENTRY;
1372
1373         LASSERT(thread != NULL);
1374         LASSERT(thread->t_data == NULL);
1375
1376         OBD_ALLOC_LARGE(tbc, sizeof(*tbc));
1377         if (tbc == NULL)
1378                 RETURN(-ENOMEM);
1379         thread->t_data = tbc;
1380         RETURN(0);
1381 }
1382 EXPORT_SYMBOL(tgt_io_thread_init);
1383
1384 /*
1385  * free per-thread pool created by tgt_thread_init().
1386  */
1387 void tgt_io_thread_done(struct ptlrpc_thread *thread)
1388 {
1389         struct tgt_thread_big_cache *tbc;
1390
1391         ENTRY;
1392
1393         LASSERT(thread != NULL);
1394
1395         /*
1396          * be prepared to handle partially-initialized pools (because this is
1397          * called from ost_io_thread_init() for cleanup.
1398          */
1399         tbc = thread->t_data;
1400         if (tbc != NULL) {
1401                 OBD_FREE_LARGE(tbc, sizeof(*tbc));
1402                 thread->t_data = NULL;
1403         }
1404         EXIT;
1405 }
1406 EXPORT_SYMBOL(tgt_io_thread_done);
1407 /**
1408  * Helper function for getting server side [start, start+count] DLM lock
1409  * if asked by client.
1410  */
1411 int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
1412                     __u64 start, __u64 end, struct lustre_handle *lh,
1413                     int mode, __u64 *flags)
1414 {
1415         ldlm_policy_data_t       policy;
1416         int                      rc;
1417
1418         ENTRY;
1419
1420         LASSERT(lh != NULL);
1421         LASSERT(ns != NULL);
1422         LASSERT(!lustre_handle_is_used(lh));
1423
1424         policy.l_extent.gid = 0;
1425         policy.l_extent.start = start & CFS_PAGE_MASK;
1426
1427         /*
1428          * If ->o_blocks is EOF it means "lock till the end of the file".
1429          * Otherwise, it's size of an extent or hole being punched (in bytes).
1430          */
1431         if (end == OBD_OBJECT_EOF || end < start)
1432                 policy.l_extent.end = OBD_OBJECT_EOF;
1433         else
1434                 policy.l_extent.end = end | ~CFS_PAGE_MASK;
1435
1436         rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode,
1437                                     flags, ldlm_blocking_ast,
1438                                     ldlm_completion_ast, ldlm_glimpse_ast,
1439                                     NULL, 0, LVB_T_NONE, NULL, lh);
1440         RETURN(rc == ELDLM_OK ? 0 : -EIO);
1441 }
1442 EXPORT_SYMBOL(tgt_extent_lock);
1443
1444 void tgt_extent_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
1445 {
1446         LASSERT(lustre_handle_is_used(lh));
1447         ldlm_lock_decref(lh, mode);
1448 }
1449 EXPORT_SYMBOL(tgt_extent_unlock);
1450
1451 int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
1452                  struct obd_ioobj *obj, struct niobuf_remote *nb,
1453                  struct lustre_handle *lh, int mode)
1454 {
1455         __u64                    flags = 0;
1456         int                      nrbufs = obj->ioo_bufcnt;
1457         int                      i;
1458
1459         ENTRY;
1460
1461         LASSERT(mode == LCK_PR || mode == LCK_PW);
1462         LASSERT(!lustre_handle_is_used(lh));
1463
1464         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
1465                 RETURN(0);
1466
1467         for (i = 1; i < nrbufs; i++)
1468                 if (!(nb[i].flags & OBD_BRW_SRVLOCK))
1469                         RETURN(-EFAULT);
1470
1471         RETURN(tgt_extent_lock(ns, res_id, nb[0].offset,
1472                                nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1,
1473                                lh, mode, &flags));
1474 }
1475 EXPORT_SYMBOL(tgt_brw_lock);
1476
1477 void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
1478                     struct lustre_handle *lh, int mode)
1479 {
1480         ENTRY;
1481
1482         LASSERT(mode == LCK_PR || mode == LCK_PW);
1483         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
1484                 lustre_handle_is_used(lh));
1485         if (lustre_handle_is_used(lh))
1486                 tgt_extent_unlock(lh, mode);
1487         EXIT;
1488 }
1489 EXPORT_SYMBOL(tgt_brw_unlock);
1490
1491 static __u32 tgt_checksum_bulk(struct lu_target *tgt,
1492                                struct ptlrpc_bulk_desc *desc, int opc,
1493                                cksum_type_t cksum_type)
1494 {
1495         struct cfs_crypto_hash_desc     *hdesc;
1496         unsigned int                    bufsize;
1497         int                             i, err;
1498         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1499         __u32                           cksum;
1500
1501         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1502         if (IS_ERR(hdesc)) {
1503                 CERROR("%s: unable to initialize checksum hash %s\n",
1504                        tgt_name(tgt), cfs_crypto_hash_name(cfs_alg));
1505                 return PTR_ERR(hdesc);
1506         }
1507
1508         CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
1509         for (i = 0; i < desc->bd_iov_count; i++) {
1510                 /* corrupt the data before we compute the checksum, to
1511                  * simulate a client->OST data error */
1512                 if (i == 0 && opc == OST_WRITE &&
1513                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
1514                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
1515                         int len = desc->bd_iov[i].kiov_len;
1516                         struct page *np = tgt_page_to_corrupt;
1517                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
1518
1519                         if (np) {
1520                                 char *ptr2 = kmap(np) + off;
1521
1522                                 memcpy(ptr2, ptr, len);
1523                                 memcpy(ptr2, "bad3", min(4, len));
1524                                 kunmap(np);
1525                                 desc->bd_iov[i].kiov_page = np;
1526                         } else {
1527                                 CERROR("%s: can't alloc page for corruption\n",
1528                                        tgt_name(tgt));
1529                         }
1530                 }
1531                 cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
1532                                   desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
1533                                   desc->bd_iov[i].kiov_len);
1534
1535                  /* corrupt the data after we compute the checksum, to
1536                  * simulate an OST->client data error */
1537                 if (i == 0 && opc == OST_READ &&
1538                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
1539                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
1540                         int len = desc->bd_iov[i].kiov_len;
1541                         struct page *np = tgt_page_to_corrupt;
1542                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
1543
1544                         if (np) {
1545                                 char *ptr2 = kmap(np) + off;
1546
1547                                 memcpy(ptr2, ptr, len);
1548                                 memcpy(ptr2, "bad4", min(4, len));
1549                                 kunmap(np);
1550                                 desc->bd_iov[i].kiov_page = np;
1551                         } else {
1552                                 CERROR("%s: can't alloc page for corruption\n",
1553                                        tgt_name(tgt));
1554                         }
1555                 }
1556         }
1557
1558         bufsize = 4;
1559         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1560         if (err)
1561                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1562
1563         return cksum;
1564 }
1565
1566 int tgt_brw_read(struct tgt_session_info *tsi)
1567 {
1568         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1569         struct ptlrpc_bulk_desc *desc = NULL;
1570         struct obd_export       *exp = tsi->tsi_exp;
1571         struct niobuf_remote    *remote_nb;
1572         struct niobuf_local     *local_nb;
1573         struct obd_ioobj        *ioo;
1574         struct ost_body         *body, *repbody;
1575         struct l_wait_info       lwi;
1576         struct lustre_handle     lockh = { 0 };
1577         int                      niocount, npages, nob = 0, rc, i;
1578         int                      no_reply = 0;
1579         struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
1580
1581         ENTRY;
1582
1583         if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
1584                 CERROR("%s: deny read request from %s to portal %u\n",
1585                        tgt_name(tsi->tsi_tgt),
1586                        obd_export_nid2str(req->rq_export),
1587                        ptlrpc_req2svc(req)->srv_req_portal);
1588                 RETURN(-EPROTO);
1589         }
1590
1591         req->rq_bulk_read = 1;
1592
1593         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
1594                 RETURN(-EIO);
1595
1596         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
1597                          cfs_fail_val : (obd_timeout + 1) / 4);
1598
1599         /* Check if there is eviction in progress, and if so, wait for it to
1600          * finish */
1601         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
1602                 /* We do not care how long it takes */
1603                 lwi = LWI_INTR(NULL, NULL);
1604                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
1605                          !atomic_read(&exp->exp_obd->obd_evict_inprogress),
1606                          &lwi);
1607         }
1608
1609         /* There must be big cache in current thread to process this request
1610          * if it is NULL then something went wrong and it wasn't allocated,
1611          * report -ENOMEM in that case */
1612         if (tbc == NULL)
1613                 RETURN(-ENOMEM);
1614
1615         body = tsi->tsi_ost_body;
1616         LASSERT(body != NULL);
1617
1618         ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
1619         LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
1620
1621         niocount = ioo->ioo_bufcnt;
1622         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1623         LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
1624
1625         local_nb = tbc->local;
1626
1627         rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
1628                           remote_nb, &lockh, LCK_PR);
1629         if (rc != 0)
1630                 RETURN(rc);
1631
1632         /*
1633          * If getting the lock took more time than
1634          * client was willing to wait, drop it. b=11330
1635          */
1636         if (cfs_time_current_sec() > req->rq_deadline ||
1637             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1638                 no_reply = 1;
1639                 CERROR("Dropping timed-out read from %s because locking"
1640                        "object "DOSTID" took %ld seconds (limit was %ld).\n",
1641                        libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
1642                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
1643                        req->rq_deadline - req->rq_arrival_time.tv_sec);
1644                 GOTO(out_lock, rc = -ETIMEDOUT);
1645         }
1646
1647         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1648         repbody->oa = body->oa;
1649
1650         npages = PTLRPC_MAX_BRW_PAGES;
1651         rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
1652                         ioo, remote_nb, &npages, local_nb, NULL, BYPASS_CAPA);
1653         if (rc != 0)
1654                 GOTO(out_lock, rc);
1655
1656         desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
1657                                     BULK_PUT_SOURCE, OST_BULK_PORTAL);
1658         if (desc == NULL)
1659                 GOTO(out_commitrw, rc = -ENOMEM);
1660
1661         nob = 0;
1662         for (i = 0; i < npages; i++) {
1663                 int page_rc = local_nb[i].rc;
1664
1665                 if (page_rc < 0) {
1666                         rc = page_rc;
1667                         break;
1668                 }
1669
1670                 nob += page_rc;
1671                 if (page_rc != 0) { /* some data! */
1672                         LASSERT(local_nb[i].page != NULL);
1673                         ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
1674                                                     local_nb[i].lnb_page_offset,
1675                                                     page_rc);
1676                 }
1677
1678                 if (page_rc != local_nb[i].len) { /* short read */
1679                         /* All subsequent pages should be 0 */
1680                         while (++i < npages)
1681                                 LASSERT(local_nb[i].rc == 0);
1682                         break;
1683                 }
1684         }
1685
1686         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1687                 cksum_type_t cksum_type =
1688                         cksum_type_unpack(body->oa.o_valid & OBD_MD_FLFLAGS ?
1689                                           body->oa.o_flags : 0);
1690                 repbody->oa.o_flags = cksum_type_pack(cksum_type);
1691                 repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1692                 repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
1693                                                         OST_READ, cksum_type);
1694                 CDEBUG(D_PAGE, "checksum at read origin: %x\n",
1695                        repbody->oa.o_cksum);
1696         } else {
1697                 repbody->oa.o_valid = 0;
1698         }
1699         /* We're finishing using body->oa as an input variable */
1700
1701         /* Check if client was evicted while we were doing i/o before touching
1702          * network */
1703         if (likely(rc == 0 &&
1704                    !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
1705                 rc = target_bulk_io(exp, desc, &lwi);
1706                 no_reply = rc != 0;
1707         }
1708
1709 out_commitrw:
1710         /* Must commit after prep above in all cases */
1711         rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp,
1712                           &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
1713                           NULL, rc);
1714         if (rc == 0)
1715                 tgt_drop_id(exp, &repbody->oa);
1716 out_lock:
1717         tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PR);
1718
1719         if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
1720                 ptlrpc_free_bulk_nopin(desc);
1721
1722         LASSERT(rc <= 0);
1723         if (rc == 0) {
1724                 rc = nob;
1725                 ptlrpc_lprocfs_brw(req, nob);
1726         } else if (no_reply) {
1727                 req->rq_no_reply = 1;
1728                 /* reply out callback would free */
1729                 ptlrpc_req_drop_rs(req);
1730                 LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
1731                               "client will retry: rc %d\n",
1732                               exp->exp_obd->obd_name,
1733                               obd_uuid2str(&exp->exp_client_uuid),
1734                               obd_export_nid2str(exp), rc);
1735         }
1736         /* send a bulk after reply to simulate a network delay or reordering
1737          * by a router */
1738         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
1739                 wait_queue_head_t        waitq;
1740                 struct l_wait_info       lwi1;
1741
1742                 CDEBUG(D_INFO, "reorder BULK\n");
1743                 init_waitqueue_head(&waitq);
1744
1745                 lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
1746                 l_wait_event(waitq, 0, &lwi1);
1747                 target_bulk_io(exp, desc, &lwi);
1748                 ptlrpc_free_bulk_nopin(desc);
1749         }
1750
1751         RETURN(rc);
1752 }
1753 EXPORT_SYMBOL(tgt_brw_read);
1754
1755 static void tgt_warn_on_cksum(struct ptlrpc_request *req,
1756                               struct ptlrpc_bulk_desc *desc,
1757                               struct niobuf_local *local_nb, int npages,
1758                               obd_count client_cksum, obd_count server_cksum,
1759                               bool mmap)
1760 {
1761         struct obd_export *exp = req->rq_export;
1762         struct ost_body *body;
1763         char *router;
1764         char *via;
1765
1766         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1767         LASSERT(body != NULL);
1768
1769         if (req->rq_peer.nid == desc->bd_sender) {
1770                 via = router = "";
1771         } else {
1772                 via = " via ";
1773                 router = libcfs_nid2str(desc->bd_sender);
1774         }
1775
1776         if (mmap) {
1777                 CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
1778                              client_cksum, server_cksum);
1779                 return;
1780         }
1781
1782         LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
1783                            DFID" object "DOSTID" extent ["LPU64"-"LPU64
1784                            "]: client csum %x, server csum %x\n",
1785                            exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
1786                            via, router,
1787                            body->oa.o_valid & OBD_MD_FLFID ?
1788                            body->oa.o_parent_seq : (__u64)0,
1789                            body->oa.o_valid & OBD_MD_FLFID ?
1790                            body->oa.o_parent_oid : 0,
1791                            body->oa.o_valid & OBD_MD_FLFID ?
1792                            body->oa.o_parent_ver : 0,
1793                            POSTID(&body->oa.o_oi),
1794                            local_nb[0].lnb_file_offset,
1795                            local_nb[npages-1].lnb_file_offset +
1796                            local_nb[npages-1].len - 1,
1797                            client_cksum, server_cksum);
1798 }
1799
1800 int tgt_brw_write(struct tgt_session_info *tsi)
1801 {
1802         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1803         struct ptlrpc_bulk_desc *desc = NULL;
1804         struct obd_export       *exp = req->rq_export;
1805         struct niobuf_remote    *remote_nb;
1806         struct niobuf_local     *local_nb;
1807         struct obd_ioobj        *ioo;
1808         struct ost_body         *body, *repbody;
1809         struct l_wait_info       lwi;
1810         struct lustre_handle     lockh = {0};
1811         __u32                   *rcs;
1812         int                      objcount, niocount, npages;
1813         int                      rc, i, j;
1814         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
1815         bool                     no_reply = false, mmap;
1816         struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
1817
1818         ENTRY;
1819
1820         if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
1821                 CERROR("%s: deny write request from %s to portal %u\n",
1822                        tgt_name(tsi->tsi_tgt),
1823                        obd_export_nid2str(req->rq_export),
1824                        ptlrpc_req2svc(req)->srv_req_portal);
1825                 RETURN(err_serious(-EPROTO));
1826         }
1827
1828         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
1829                 RETURN(err_serious(-ENOSPC));
1830         if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
1831                 RETURN(err_serious(-EROFS));
1832
1833         req->rq_bulk_write = 1;
1834
1835         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
1836                 RETURN(err_serious(-EIO));
1837         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
1838                 RETURN(err_serious(-EFAULT));
1839
1840         /* pause before transaction has been started */
1841         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
1842                          cfs_fail_val : (obd_timeout + 1) / 4);
1843
1844         /* There must be big cache in current thread to process this request
1845          * if it is NULL then something went wrong and it wasn't allocated,
1846          * report -ENOMEM in that case */
1847         if (tbc == NULL)
1848                 RETURN(-ENOMEM);
1849
1850         body = tsi->tsi_ost_body;
1851         LASSERT(body != NULL);
1852
1853         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1854         LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
1855
1856         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1857                                         RCL_CLIENT) / sizeof(*ioo);
1858
1859         for (niocount = i = 0; i < objcount; i++)
1860                 niocount += ioo[i].ioo_bufcnt;
1861
1862         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1863         LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
1864         if (niocount != req_capsule_get_size(&req->rq_pill,
1865                                              &RMF_NIOBUF_REMOTE, RCL_CLIENT) /
1866                         sizeof(*remote_nb))
1867                 RETURN(err_serious(-EPROTO));
1868
1869         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
1870             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
1871                 memory_pressure_set();
1872
1873         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
1874                              niocount * sizeof(*rcs));
1875         rc = req_capsule_server_pack(&req->rq_pill);
1876         if (rc != 0)
1877                 GOTO(out, rc = err_serious(rc));
1878
1879         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
1880         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
1881
1882         local_nb = tbc->local;
1883
1884         rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
1885                           remote_nb, &lockh, LCK_PW);
1886         if (rc != 0)
1887                 GOTO(out, rc);
1888
1889         /*
1890          * If getting the lock took more time than
1891          * client was willing to wait, drop it. b=11330
1892          */
1893         if (cfs_time_current_sec() > req->rq_deadline ||
1894             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1895                 no_reply = true;
1896                 CERROR("%s: Dropping timed-out write from %s because locking "
1897                        "object "DOSTID" took %ld seconds (limit was %ld).\n",
1898                        tgt_name(tsi->tsi_tgt), libcfs_id2str(req->rq_peer),
1899                        POSTID(&ioo->ioo_oid),
1900                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
1901                        req->rq_deadline - req->rq_arrival_time.tv_sec);
1902                 GOTO(out_lock, rc = -ETIMEDOUT);
1903         }
1904
1905         /* Because we already sync grant info with client when reconnect,
1906          * grant info will be cleared for resent req, then fed_grant and
1907          * total_grant will not be modified in following preprw_write */
1908         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1909                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1910                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1911         }
1912
1913         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1914         if (repbody == NULL)
1915                 GOTO(out_lock, rc = -ENOMEM);
1916         repbody->oa = body->oa;
1917
1918         npages = PTLRPC_MAX_BRW_PAGES;
1919         rc = obd_preprw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
1920                         objcount, ioo, remote_nb, &npages, local_nb, NULL,
1921                         BYPASS_CAPA);
1922         if (rc < 0)
1923                 GOTO(out_lock, rc);
1924
1925         desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
1926                                     BULK_GET_SINK, OST_BULK_PORTAL);
1927         if (desc == NULL)
1928                 GOTO(skip_transfer, rc = -ENOMEM);
1929
1930         /* NB Having prepped, we must commit... */
1931         for (i = 0; i < npages; i++)
1932                 ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
1933                                             local_nb[i].lnb_page_offset,
1934                                             local_nb[i].len);
1935
1936         rc = sptlrpc_svc_prep_bulk(req, desc);
1937         if (rc != 0)
1938                 GOTO(skip_transfer, rc);
1939
1940         rc = target_bulk_io(exp, desc, &lwi);
1941         no_reply = rc != 0;
1942
1943 skip_transfer:
1944         if (body->oa.o_valid & OBD_MD_FLCKSUM && rc == 0) {
1945                 static int cksum_counter;
1946
1947                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1948                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1949
1950                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1951                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1952                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1953                 repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
1954                                                         OST_WRITE, cksum_type);
1955                 cksum_counter++;
1956
1957                 if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) {
1958                         mmap = (body->oa.o_valid & OBD_MD_FLFLAGS &&
1959                                 body->oa.o_flags & OBD_FL_MMAP);
1960
1961                         tgt_warn_on_cksum(req, desc, local_nb, npages,
1962                                           body->oa.o_cksum,
1963                                           repbody->oa.o_cksum, mmap);
1964                         cksum_counter = 0;
1965                 } else if ((cksum_counter & (-cksum_counter)) ==
1966                            cksum_counter) {
1967                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1968                                cksum_counter, libcfs_id2str(req->rq_peer),
1969                                repbody->oa.o_cksum);
1970                 }
1971         }
1972
1973         /* Must commit after prep above in all cases */
1974         rc = obd_commitrw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
1975                           objcount, ioo, remote_nb, npages, local_nb, NULL,
1976                           rc);
1977         if (rc == -ENOTCONN)
1978                 /* quota acquire process has been given up because
1979                  * either the client has been evicted or the client
1980                  * has timed out the request already */
1981                 no_reply = true;
1982
1983         /*
1984          * Disable sending mtime back to the client. If the client locked the
1985          * whole object, then it has already updated the mtime on its side,
1986          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1987          */
1988         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1989
1990         if (rc == 0) {
1991                 int nob = 0;
1992
1993                 /* set per-requested niobuf return codes */
1994                 for (i = j = 0; i < niocount; i++) {
1995                         int len = remote_nb[i].len;
1996
1997                         nob += len;
1998                         rcs[i] = 0;
1999                         do {
2000                                 LASSERT(j < npages);
2001                                 if (local_nb[j].rc < 0)
2002                                         rcs[i] = local_nb[j].rc;
2003                                 len -= local_nb[j].len;
2004                                 j++;
2005                         } while (len > 0);
2006                         LASSERT(len == 0);
2007                 }
2008                 LASSERT(j == npages);
2009                 ptlrpc_lprocfs_brw(req, nob);
2010
2011                 tgt_drop_id(exp, &repbody->oa);
2012         }
2013 out_lock:
2014         tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PW);
2015         if (desc)
2016                 ptlrpc_free_bulk_nopin(desc);
2017 out:
2018         if (no_reply) {
2019                 req->rq_no_reply = 1;
2020                 /* reply out callback would free */
2021                 ptlrpc_req_drop_rs(req);
2022                 LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
2023                               "client will retry: rc %d\n",
2024                               exp->exp_obd->obd_name,
2025                               obd_uuid2str(&exp->exp_client_uuid),
2026                               obd_export_nid2str(exp), rc);
2027         }
2028         memory_pressure_clr();
2029         RETURN(rc);
2030 }
2031 EXPORT_SYMBOL(tgt_brw_write);