Whamcloud - gitweb
c5a9fde3b3416926409d91e83ba0c959e0087e4c
[fs/lustre-release.git] / lustre / target / tgt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2011, 2012, Intel Corporation.
25  */
26 /*
27  * lustre/target/tgt_handler.c
28  *
29  * Lustre Unified Target request handler code
30  *
31  * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32  * Author: Mikhail Pershin <mike.pershin@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_CLASS
36
37 #include <obd.h>
38 #include <obd_class.h>
39
40 #include "tgt_internal.h"
41
42 char *tgt_name(struct lu_target *tgt)
43 {
44         LASSERT(tgt->lut_obd != NULL);
45         return tgt->lut_obd->obd_name;
46 }
47 EXPORT_SYMBOL(tgt_name);
48
49 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
50 {
51         struct req_capsule      *pill = tsi->tsi_pill;
52         const struct mdt_body   *body = NULL;
53         int                      rc = 0;
54
55         ENTRY;
56
57         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
58                 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
59                 if (body == NULL)
60                         RETURN(-EFAULT);
61         }
62
63         if (flags & HABEO_REFERO) {
64                 /* Pack reply */
65                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
66                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
67                                              body ? body->eadatasize : 0);
68                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
69                         req_capsule_set_size(pill, &RMF_LOGCOOKIES,
70                                              RCL_SERVER, 0);
71
72                 rc = req_capsule_server_pack(pill);
73         }
74         RETURN(rc);
75 }
76
77 /*
78  * Invoke handler for this request opc. Also do necessary preprocessing
79  * (according to handler ->th_flags), and post-processing (setting of
80  * ->last_{xid,committed}).
81  */
82 static int tgt_handle_request0(struct tgt_session_info *tsi,
83                                struct tgt_handler *h,
84                                struct ptlrpc_request *req)
85 {
86         int      serious = 0;
87         int      rc;
88         __u32    flags;
89
90         ENTRY;
91
92         LASSERT(h->th_act != NULL);
93         LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
94         LASSERT(current->journal_info == NULL);
95
96         rc = 0;
97         flags = h->th_flags;
98         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
99                      h->th_fmt != NULL));
100         if (h->th_fmt != NULL) {
101                 req_capsule_set(tsi->tsi_pill, h->th_fmt);
102                 rc = tgt_unpack_req_pack_rep(tsi, flags);
103         }
104
105         if (rc == 0 && flags & MUTABOR &&
106             tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
107                 rc = -EROFS;
108
109         if (rc == 0 && flags & HABEO_CLAVIS) {
110                 struct ldlm_request *dlm_req;
111
112                 LASSERT(h->th_fmt != NULL);
113
114                 dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
115                 if (dlm_req != NULL) {
116                         if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
117                                      LDLM_IBITS &&
118                                      dlm_req->lock_desc.l_policy_data.\
119                                      l_inodebits.bits == 0)) {
120                                 /*
121                                  * Lock without inodebits makes no sense and
122                                  * will oops later in ldlm. If client miss to
123                                  * set such bits, do not trigger ASSERTION.
124                                  *
125                                  * For liblustre flock case, it maybe zero.
126                                  */
127                                 rc = -EPROTO;
128                         } else {
129                                 tsi->tsi_dlm_req = dlm_req;
130                         }
131                 } else {
132                         rc = -EFAULT;
133                 }
134         }
135
136         if (likely(rc == 0)) {
137                 /*
138                  * Process request, there can be two types of rc:
139                  * 1) errors with msg unpack/pack, other failures outside the
140                  * operation itself. This is counted as serious errors;
141                  * 2) errors during fs operation, should be placed in rq_status
142                  * only
143                  */
144                 rc = h->th_act(tsi);
145                 if (!is_serious(rc) &&
146                     !req->rq_no_reply && req->rq_reply_state == NULL) {
147                         DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
148                                   "pack reply and returned 0 error\n",
149                                   tgt_name(tsi->tsi_tgt), h->th_name);
150                         LBUG();
151                 }
152                 serious = is_serious(rc);
153                 rc = clear_serious(rc);
154         } else {
155                 serious = 1;
156         }
157
158         req->rq_status = rc;
159
160         /*
161          * ELDLM_* codes which > 0 should be in rq_status only as well as
162          * all non-serious errors.
163          */
164         if (rc > 0 || !serious)
165                 rc = 0;
166
167         LASSERT(current->journal_info == NULL);
168
169         /*
170          * If we're DISCONNECTing, the export_data is already freed
171          *
172          * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
173          */
174         if (likely(rc == 0 && req->rq_export))
175                 target_committed_to_req(req);
176
177         target_send_reply(req, rc, tsi->tsi_reply_fail_id);
178         RETURN(0);
179 }
180
181 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
182                                        struct obd_device *obd, int *process)
183 {
184         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
185         case MDS_DISCONNECT:
186         case OST_DISCONNECT:
187         case OBD_IDX_READ:
188                 *process = 1;
189                 RETURN(0);
190         case MDS_CLOSE:
191         case MDS_DONE_WRITING:
192         case MDS_SYNC: /* used in unmounting */
193         case OBD_PING:
194         case MDS_REINT:
195         case UPDATE_OBJ:
196         case SEQ_QUERY:
197         case FLD_QUERY:
198         case LDLM_ENQUEUE:
199                 *process = target_queue_recovery_request(req, obd);
200                 RETURN(0);
201
202         default:
203                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
204                 *process = -EAGAIN;
205                 RETURN(0);
206         }
207 }
208
209 /*
210  * Handle recovery. Return:
211  *        +1: continue request processing;
212  *       -ve: abort immediately with the given error code;
213  *         0: send reply with error code in req->rq_status;
214  */
215 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
216 {
217         ENTRY;
218
219         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
220         case MDS_CONNECT:
221         case OST_CONNECT:
222         case MGS_CONNECT:
223         case SEC_CTX_INIT:
224         case SEC_CTX_INIT_CONT:
225         case SEC_CTX_FINI:
226                 RETURN(+1);
227         }
228
229         if (!req->rq_export->exp_obd->obd_replayable)
230                 RETURN(+1);
231
232         /* sanity check: if the xid matches, the request must be marked as a
233          * resent or replayed */
234         if (req_xid_is_last(req)) {
235                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
236                       (MSG_RESENT | MSG_REPLAY))) {
237                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
238                                   "last_xid, expected REPLAY or RESENT flag "
239                                   "(%x)", req->rq_xid,
240                                   lustre_msg_get_flags(req->rq_reqmsg));
241                         req->rq_status = -ENOTCONN;
242                         RETURN(-ENOTCONN);
243                 }
244         }
245         /* else: note the opposite is not always true; a RESENT req after a
246          * failover will usually not match the last_xid, since it was likely
247          * never committed. A REPLAYed request will almost never match the
248          * last xid, however it could for a committed, but still retained,
249          * open. */
250
251         /* Check for aborted recovery... */
252         if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
253                 int rc;
254                 int should_process;
255
256                 DEBUG_REQ(D_INFO, req, "Got new replay");
257                 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
258                                                  &should_process);
259                 if (rc != 0 || !should_process)
260                         RETURN(rc);
261                 else if (should_process < 0) {
262                         req->rq_status = should_process;
263                         rc = ptlrpc_error(req);
264                         RETURN(rc);
265                 }
266         }
267         RETURN(+1);
268 }
269
270 int tgt_request_handle(struct ptlrpc_request *req)
271 {
272         struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
273
274         struct lustre_msg       *msg = req->rq_reqmsg;
275         struct tgt_handler      *h;
276         struct tgt_opc_slice    *s;
277         struct lu_target        *tgt;
278         int                      request_fail_id = 0;
279         __u32                    opc = lustre_msg_get_opc(msg);
280         int                      rc;
281
282         ENTRY;
283
284         /* Refill(initilize) the context, in case it is
285          * not initialized yet. */
286         lu_env_refill(req->rq_svc_thread->t_env);
287
288         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
289         tsi->tsi_pill = &req->rq_pill;
290         tsi->tsi_env = req->rq_svc_thread->t_env;
291         tsi->tsi_dlm_req = NULL;
292
293         /* if request has export then get handlers slice from corresponding
294          * target, otherwise that should be connect operation */
295         if (opc == MDS_CONNECT || opc == OST_CONNECT ||
296             opc == MGS_CONNECT) {
297                 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
298                 rc = target_handle_connect(req);
299                 if (rc != 0) {
300                         rc = ptlrpc_error(req);
301                         GOTO(out, rc);
302                 }
303         }
304
305         if (unlikely(!class_connected_export(req->rq_export))) {
306                 CDEBUG(D_HA, "operation %d on unconnected OST from %s\n",
307                        opc, libcfs_id2str(req->rq_peer));
308                 req->rq_status = -ENOTCONN;
309                 rc = ptlrpc_error(req);
310                 GOTO(out, rc);
311         }
312
313         tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
314         tsi->tsi_exp = req->rq_export;
315
316         request_fail_id = tgt->lut_request_fail_id;
317         tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
318
319         for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
320                 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
321                         break;
322
323         /* opcode was not found in slice */
324         if (unlikely(s->tos_hs == NULL)) {
325                 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
326                 req->rq_status = -ENOTSUPP;
327                 rc = ptlrpc_error(req);
328                 GOTO(out, rc);
329         }
330
331         if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
332                 GOTO(out, rc = 0);
333
334         LASSERT(current->journal_info == NULL);
335
336         LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
337         h = s->tos_hs + (opc - s->tos_opc_start);
338         if (unlikely(h->th_opc == 0)) {
339                 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
340                 req->rq_status = -ENOTSUPP;
341                 rc = ptlrpc_error(req);
342                 GOTO(out, rc);
343         }
344
345         rc = lustre_msg_check_version(msg, h->th_version);
346         if (unlikely(rc)) {
347                 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
348                           " %08x, expecting %08x\n", tgt_name(tgt),
349                           lustre_msg_get_version(msg), h->th_version);
350                 req->rq_status = -EINVAL;
351                 rc = ptlrpc_error(req);
352                 GOTO(out, rc);
353         }
354
355         rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
356         if (likely(rc == 1)) {
357                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
358                          h->th_opc, opc);
359                 rc = tgt_handle_request0(tsi, h, req);
360                 if (rc)
361                         GOTO(out, rc);
362         }
363         EXIT;
364 out:
365         req_capsule_fini(tsi->tsi_pill);
366         tsi->tsi_pill = NULL;
367         return rc;
368 }
369 EXPORT_SYMBOL(tgt_request_handle);
370
371 void tgt_counter_incr(struct obd_export *exp, int opcode)
372 {
373         lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
374         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
375                 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
376 }
377 EXPORT_SYMBOL(tgt_counter_incr);
378
379 /*
380  * Unified target generic handlers.
381  */
382
383 /*
384  * Security functions
385  */
386 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
387 {
388         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
389                                       OBD_CONNECT_RMT_CLIENT_FORCE |
390                                       OBD_CONNECT_MDS_CAPA |
391                                       OBD_CONNECT_OSS_CAPA);
392 }
393
394 static int tgt_init_sec_level(struct ptlrpc_request *req)
395 {
396         struct lu_target        *tgt = class_exp2tgt(req->rq_export);
397         char                    *client = libcfs_nid2str(req->rq_peer.nid);
398         struct obd_connect_data *data, *reply;
399         int                      rc = 0;
400         bool                     remote;
401
402         ENTRY;
403
404         data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
405         reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
406         if (data == NULL || reply == NULL)
407                 RETURN(-EFAULT);
408
409         /* connection from MDT is always trusted */
410         if (req->rq_auth_usr_mdt) {
411                 tgt_init_sec_none(reply);
412                 RETURN(0);
413         }
414
415         /* no GSS support case */
416         if (!req->rq_auth_gss) {
417                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
418                         CWARN("client %s -> target %s does not use GSS, "
419                               "can not run under security level %d.\n",
420                               client, tgt_name(tgt), tgt->lut_sec_level);
421                         RETURN(-EACCES);
422                 } else {
423                         tgt_init_sec_none(reply);
424                         RETURN(0);
425                 }
426         }
427
428         /* old version case */
429         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
430                      !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
431                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
432                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
433                         CWARN("client %s -> target %s uses old version, "
434                               "can not run under security level %d.\n",
435                               client, tgt_name(tgt), tgt->lut_sec_level);
436                         RETURN(-EACCES);
437                 } else {
438                         CWARN("client %s -> target %s uses old version, "
439                               "run under security level %d.\n",
440                               client, tgt_name(tgt), tgt->lut_sec_level);
441                         tgt_init_sec_none(reply);
442                         RETURN(0);
443                 }
444         }
445
446         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
447         if (remote) {
448                 if (!req->rq_auth_remote)
449                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
450                                "asked to be remote.\n", client, tgt_name(tgt));
451         } else if (req->rq_auth_remote) {
452                 remote = true;
453                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
454                        "as remote by default.\n", client, tgt_name(tgt));
455         }
456
457         if (remote) {
458                 if (!tgt->lut_oss_capa) {
459                         CDEBUG(D_SEC,
460                                "client %s -> target %s is set as remote,"
461                                " but OSS capabilities are not enabled: %d.\n",
462                                client, tgt_name(tgt), tgt->lut_oss_capa);
463                         RETURN(-EACCES);
464                 }
465         } else {
466                 if (req->rq_auth_uid == INVALID_UID) {
467                         CDEBUG(D_SEC, "client %s -> target %s: user is not "
468                                "authenticated!\n", client, tgt_name(tgt));
469                         RETURN(-EACCES);
470                 }
471         }
472
473
474         switch (tgt->lut_sec_level) {
475         case LUSTRE_SEC_NONE:
476                 if (remote) {
477                         CDEBUG(D_SEC,
478                                "client %s -> target %s is set as remote, "
479                                "can not run under security level %d.\n",
480                                client, tgt_name(tgt), tgt->lut_sec_level);
481                         RETURN(-EACCES);
482                 }
483                 tgt_init_sec_none(reply);
484                 break;
485         case LUSTRE_SEC_REMOTE:
486                 if (!remote)
487                         tgt_init_sec_none(reply);
488                 break;
489         case LUSTRE_SEC_ALL:
490                 if (remote)
491                         break;
492                 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
493                                               OBD_CONNECT_RMT_CLIENT_FORCE);
494                 if (!tgt->lut_oss_capa)
495                         reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
496                 if (!tgt->lut_mds_capa)
497                         reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
498                 break;
499         default:
500                 RETURN(-EINVAL);
501         }
502
503         RETURN(rc);
504 }
505
506 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
507 {
508         struct lu_target        *tgt = class_exp2tgt(exp);
509         struct sptlrpc_flavor    flvr;
510         int                      rc = 0;
511
512         LASSERT(tgt);
513         LASSERT(tgt->lut_obd);
514         LASSERT(tgt->lut_slice);
515
516         /* always allow ECHO client */
517         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
518                             LUSTRE_ECHO_NAME) == 0)) {
519                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
520                 return 0;
521         }
522
523         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
524                 read_lock(&tgt->lut_sptlrpc_lock);
525                 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
526                                              req->rq_sp_from,
527                                              req->rq_peer.nid,
528                                              &flvr);
529                 read_unlock(&tgt->lut_sptlrpc_lock);
530
531                 spin_lock(&exp->exp_lock);
532                 exp->exp_sp_peer = req->rq_sp_from;
533                 exp->exp_flvr = flvr;
534                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
535                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
536                         CERROR("%s: unauthorized rpc flavor %x from %s, "
537                                "expect %x\n", tgt_name(tgt),
538                                req->rq_flvr.sf_rpc,
539                                libcfs_nid2str(req->rq_peer.nid),
540                                exp->exp_flvr.sf_rpc);
541                         rc = -EACCES;
542                 }
543                 spin_unlock(&exp->exp_lock);
544         } else {
545                 if (exp->exp_sp_peer != req->rq_sp_from) {
546                         CERROR("%s: RPC source %s doesn't match %s\n",
547                                tgt_name(tgt),
548                                sptlrpc_part2name(req->rq_sp_from),
549                                sptlrpc_part2name(exp->exp_sp_peer));
550                         rc = -EACCES;
551                 } else {
552                         rc = sptlrpc_target_export_check(exp, req);
553                 }
554         }
555
556         return rc;
557 }
558
559 int tgt_connect(struct tgt_session_info *tsi)
560 {
561         struct ptlrpc_request   *req = tgt_ses_req(tsi);
562         struct obd_connect_data *reply;
563         int                      rc;
564
565         ENTRY;
566
567         rc = tgt_init_sec_level(req);
568         if (rc != 0)
569                 GOTO(out, rc);
570
571         /* XXX: better to call this check right after getting new export but
572          * before last_rcvd slot allocation to avoid server load upon insecure
573          * connects. This is to be fixed after unifiyng all targets.
574          */
575         rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
576         if (rc)
577                 GOTO(out, rc);
578
579         /* To avoid exposing partially initialized connection flags, changes up
580          * to this point have been staged in reply->ocd_connect_flags. Now that
581          * connection handling has completed successfully, atomically update
582          * the connect flags in the shared export data structure. LU-1623 */
583         reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
584         spin_lock(&tsi->tsi_exp->exp_lock);
585         *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
586         spin_unlock(&tsi->tsi_exp->exp_lock);
587
588         RETURN(0);
589 out:
590         obd_disconnect(class_export_get(tsi->tsi_exp));
591         return rc;
592 }
593 EXPORT_SYMBOL(tgt_connect);
594
595 int tgt_disconnect(struct tgt_session_info *tsi)
596 {
597         int rc;
598
599         ENTRY;
600
601         rc = target_handle_disconnect(tgt_ses_req(tsi));
602         if (rc)
603                 RETURN(err_serious(rc));
604
605         RETURN(rc);
606 }
607 EXPORT_SYMBOL(tgt_disconnect);
608
609 /*
610  * Unified target OBD handlers
611  */
612 int tgt_obd_ping(struct tgt_session_info *tsi)
613 {
614         int rc;
615
616         ENTRY;
617
618         rc = target_handle_ping(tgt_ses_req(tsi));
619         if (rc)
620                 RETURN(err_serious(rc));
621
622         RETURN(rc);
623 }
624 EXPORT_SYMBOL(tgt_obd_ping);
625
626 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
627 {
628         return err_serious(-EOPNOTSUPP);
629 }
630 EXPORT_SYMBOL(tgt_obd_log_cancel);
631
632 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
633 {
634         return err_serious(-EOPNOTSUPP);
635 }
636 EXPORT_SYMBOL(tgt_obd_qc_callback);
637
638 static int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg,
639                         int nob)
640 {
641         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
642         struct ptlrpc_request   *req = tgt_ses_req(tsi);
643         struct obd_export       *exp = req->rq_export;
644         struct ptlrpc_bulk_desc *desc;
645         struct l_wait_info      *lwi = &tti->tti_u.rdpg.tti_wait_info;
646         int                      tmpcount;
647         int                      tmpsize;
648         int                      i;
649         int                      rc;
650
651         ENTRY;
652
653         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
654                                     MDS_BULK_PORTAL);
655         if (desc == NULL)
656                 RETURN(-ENOMEM);
657
658         if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
659                 /* old client requires reply size in it's PAGE_CACHE_SIZE,
660                  * which is rdpg->rp_count */
661                 nob = rdpg->rp_count;
662
663         for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
664              i++, tmpcount -= tmpsize) {
665                 tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
666                 ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
667         }
668
669         LASSERT(desc->bd_nob == nob);
670         rc = target_bulk_io(exp, desc, lwi);
671         ptlrpc_free_bulk_pin(desc);
672         RETURN(rc);
673 }
674 EXPORT_SYMBOL(tgt_sendpage);
675
676 /*
677  * OBD_IDX_READ handler
678  */
679 int tgt_obd_idx_read(struct tgt_session_info *tsi)
680 {
681         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
682         struct lu_rdpg          *rdpg = &tti->tti_u.rdpg.tti_rdpg;
683         struct idx_info         *req_ii, *rep_ii;
684         int                      rc, i;
685
686         ENTRY;
687
688         memset(rdpg, 0, sizeof(*rdpg));
689         req_capsule_set(tsi->tsi_pill, &RQF_OBD_IDX_READ);
690
691         /* extract idx_info buffer from request & reply */
692         req_ii = req_capsule_client_get(tsi->tsi_pill, &RMF_IDX_INFO);
693         if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
694                 RETURN(err_serious(-EPROTO));
695
696         rc = req_capsule_server_pack(tsi->tsi_pill);
697         if (rc)
698                 RETURN(err_serious(rc));
699
700         rep_ii = req_capsule_server_get(tsi->tsi_pill, &RMF_IDX_INFO);
701         if (rep_ii == NULL)
702                 RETURN(err_serious(-EFAULT));
703         rep_ii->ii_magic = IDX_INFO_MAGIC;
704
705         /* extract hash to start with */
706         rdpg->rp_hash = req_ii->ii_hash_start;
707
708         /* extract requested attributes */
709         rdpg->rp_attrs = req_ii->ii_attrs;
710
711         /* check that fid packed in request is valid and supported */
712         if (!fid_is_sane(&req_ii->ii_fid))
713                 RETURN(-EINVAL);
714         rep_ii->ii_fid = req_ii->ii_fid;
715
716         /* copy flags */
717         rep_ii->ii_flags = req_ii->ii_flags;
718
719         /* compute number of pages to allocate, ii_count is the number of 4KB
720          * containers */
721         if (req_ii->ii_count <= 0)
722                 GOTO(out, rc = -EFAULT);
723         rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
724                                exp_max_brw_size(tsi->tsi_exp));
725         rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE -1) >> PAGE_CACHE_SHIFT;
726
727         /* allocate pages to store the containers */
728         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
729         if (rdpg->rp_pages == NULL)
730                 GOTO(out, rc = -ENOMEM);
731         for (i = 0; i < rdpg->rp_npages; i++) {
732                 rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
733                 if (rdpg->rp_pages[i] == NULL)
734                         GOTO(out, rc = -ENOMEM);
735         }
736
737         /* populate pages with key/record pairs */
738         rc = dt_index_read(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, rep_ii, rdpg);
739         if (rc < 0)
740                 GOTO(out, rc);
741
742         LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
743                  "asked %d > %d\n", rc, rdpg->rp_count);
744
745         /* send pages to client */
746         rc = tgt_sendpage(tsi, rdpg, rc);
747         if (rc)
748                 GOTO(out, rc);
749         EXIT;
750 out:
751         if (rdpg->rp_pages) {
752                 for (i = 0; i < rdpg->rp_npages; i++)
753                         if (rdpg->rp_pages[i])
754                                 __free_page(rdpg->rp_pages[i]);
755                 OBD_FREE(rdpg->rp_pages,
756                          rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
757         }
758         return rc;
759 }
760 EXPORT_SYMBOL(tgt_obd_idx_read);
761
762 struct tgt_handler tgt_obd_handlers[] = {
763 TGT_OBD_HDL    (0,      OBD_PING,               tgt_obd_ping),
764 TGT_OBD_HDL_VAR(0,      OBD_LOG_CANCEL,         tgt_obd_log_cancel),
765 TGT_OBD_HDL_VAR(0,      OBD_QC_CALLBACK,        tgt_obd_qc_callback),
766 TGT_OBD_HDL    (0,      OBD_IDX_READ,           tgt_obd_idx_read)
767 };
768 EXPORT_SYMBOL(tgt_obd_handlers);
769
770 /*
771  * Unified target DLM handlers.
772  */
773 struct ldlm_callback_suite tgt_dlm_cbs = {
774         .lcs_completion = ldlm_server_completion_ast,
775         .lcs_blocking   = ldlm_server_blocking_ast,
776         .lcs_glimpse    = ldlm_server_glimpse_ast
777 };
778
779 int tgt_enqueue(struct tgt_session_info *tsi)
780 {
781         struct ptlrpc_request *req = tgt_ses_req(tsi);
782         int rc;
783
784         ENTRY;
785         /*
786          * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
787          * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
788          */
789         LASSERT(tsi->tsi_dlm_req != NULL);
790
791         rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
792                                   tsi->tsi_dlm_req, &tgt_dlm_cbs);
793         if (rc)
794                 RETURN(err_serious(rc));
795
796         RETURN(req->rq_status);
797 }
798 EXPORT_SYMBOL(tgt_enqueue);
799
800 int tgt_convert(struct tgt_session_info *tsi)
801 {
802         struct ptlrpc_request *req = tgt_ses_req(tsi);
803         int rc;
804
805         ENTRY;
806         LASSERT(tsi->tsi_dlm_req);
807         rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
808         if (rc)
809                 RETURN(err_serious(rc));
810
811         RETURN(req->rq_status);
812 }
813 EXPORT_SYMBOL(tgt_convert);
814
815 int tgt_bl_callback(struct tgt_session_info *tsi)
816 {
817         return err_serious(-EOPNOTSUPP);
818 }
819 EXPORT_SYMBOL(tgt_bl_callback);
820
821 int tgt_cp_callback(struct tgt_session_info *tsi)
822 {
823         return err_serious(-EOPNOTSUPP);
824 }
825 EXPORT_SYMBOL(tgt_cp_callback);
826
827 /* generic LDLM target handler */
828 struct tgt_handler tgt_dlm_handlers[] = {
829 TGT_DLM_HDL    (HABEO_CLAVIS,   LDLM_ENQUEUE,           tgt_enqueue),
830 TGT_DLM_HDL_VAR(HABEO_CLAVIS,   LDLM_CONVERT,           tgt_convert),
831 TGT_DLM_HDL_VAR(0,              LDLM_BL_CALLBACK,       tgt_bl_callback),
832 TGT_DLM_HDL_VAR(0,              LDLM_CP_CALLBACK,       tgt_cp_callback)
833 };
834 EXPORT_SYMBOL(tgt_dlm_handlers);
835
836 /*
837  * Unified target LLOG handlers.
838  */
839 int tgt_llog_open(struct tgt_session_info *tsi)
840 {
841         int rc;
842
843         ENTRY;
844
845         rc = llog_origin_handle_open(tgt_ses_req(tsi));
846
847         RETURN(rc);
848 }
849 EXPORT_SYMBOL(tgt_llog_open);
850
851 int tgt_llog_close(struct tgt_session_info *tsi)
852 {
853         int rc;
854
855         ENTRY;
856
857         rc = llog_origin_handle_close(tgt_ses_req(tsi));
858
859         RETURN(rc);
860 }
861 EXPORT_SYMBOL(tgt_llog_close);
862
863
864 int tgt_llog_destroy(struct tgt_session_info *tsi)
865 {
866         int rc;
867
868         ENTRY;
869
870         rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
871
872         RETURN(rc);
873 }
874 EXPORT_SYMBOL(tgt_llog_destroy);
875
876 int tgt_llog_read_header(struct tgt_session_info *tsi)
877 {
878         int rc;
879
880         ENTRY;
881
882         rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
883
884         RETURN(rc);
885 }
886 EXPORT_SYMBOL(tgt_llog_read_header);
887
888 int tgt_llog_next_block(struct tgt_session_info *tsi)
889 {
890         int rc;
891
892         ENTRY;
893
894         rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
895
896         RETURN(rc);
897 }
898 EXPORT_SYMBOL(tgt_llog_next_block);
899
900 int tgt_llog_prev_block(struct tgt_session_info *tsi)
901 {
902         int rc;
903
904         ENTRY;
905
906         rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
907
908         RETURN(rc);
909 }
910 EXPORT_SYMBOL(tgt_llog_prev_block);
911
912 /* generic llog target handler */
913 struct tgt_handler tgt_llog_handlers[] = {
914 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_CREATE,      tgt_llog_open),
915 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
916 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
917 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
918 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_DESTROY,     tgt_llog_destroy),
919 TGT_LLOG_HDL_VAR(0,     LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
920 };
921 EXPORT_SYMBOL(tgt_llog_handlers);
922
923 /*
924  * sec context handlers
925  */
926 /* XXX: Implement based on mdt_sec_ctx_handle()? */
927 int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
928 {
929         return 0;
930 }
931
932 struct tgt_handler tgt_sec_ctx_handlers[] = {
933 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT,           tgt_sec_ctx_handle),
934 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT_CONT,      tgt_sec_ctx_handle),
935 TGT_SEC_HDL_VAR(0,      SEC_CTX_FINI,           tgt_sec_ctx_handle),
936 };
937 EXPORT_SYMBOL(tgt_sec_ctx_handlers);