Whamcloud - gitweb
0ebfbd0619ddb77d36321a99fb6d4165f98df395
[fs/lustre-release.git] / lustre / target / tgt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2011, 2012, Intel Corporation.
25  */
26 /*
27  * lustre/target/tgt_handler.c
28  *
29  * Lustre Unified Target request handler code
30  *
31  * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32  * Author: Mikhail Pershin <mike.pershin@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_CLASS
36
37 #include <obd.h>
38 #include <obd_class.h>
39
40 #include "tgt_internal.h"
41
42 char *tgt_name(struct lu_target *tgt)
43 {
44         LASSERT(tgt->lut_obd != NULL);
45         return tgt->lut_obd->obd_name;
46 }
47 EXPORT_SYMBOL(tgt_name);
48
49 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
50 {
51         struct req_capsule      *pill = tsi->tsi_pill;
52         const struct mdt_body   *body = NULL;
53         int                      rc = 0;
54
55         ENTRY;
56
57         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
58                 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
59                 if (body == NULL)
60                         RETURN(-EFAULT);
61         }
62
63         if (flags & HABEO_REFERO) {
64                 /* Pack reply */
65                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
66                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
67                                              body ? body->eadatasize : 0);
68                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
69                         req_capsule_set_size(pill, &RMF_LOGCOOKIES,
70                                              RCL_SERVER, 0);
71
72                 rc = req_capsule_server_pack(pill);
73         }
74         RETURN(rc);
75 }
76
77 /*
78  * Invoke handler for this request opc. Also do necessary preprocessing
79  * (according to handler ->th_flags), and post-processing (setting of
80  * ->last_{xid,committed}).
81  */
82 static int tgt_handle_request0(struct tgt_session_info *tsi,
83                                struct tgt_handler *h,
84                                struct ptlrpc_request *req)
85 {
86         int      serious = 0;
87         int      rc;
88         __u32    flags;
89
90         ENTRY;
91
92         LASSERT(h->th_act != NULL);
93         LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
94         LASSERT(current->journal_info == NULL);
95
96         rc = 0;
97         flags = h->th_flags;
98         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
99                      h->th_fmt != NULL));
100         if (h->th_fmt != NULL) {
101                 req_capsule_set(tsi->tsi_pill, h->th_fmt);
102                 rc = tgt_unpack_req_pack_rep(tsi, flags);
103         }
104
105         if (rc == 0 && flags & MUTABOR &&
106             tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
107                 rc = -EROFS;
108
109         if (rc == 0 && flags & HABEO_CLAVIS) {
110                 struct ldlm_request *dlm_req;
111
112                 LASSERT(h->th_fmt != NULL);
113
114                 dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
115                 if (dlm_req != NULL) {
116                         if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
117                                      LDLM_IBITS &&
118                                      dlm_req->lock_desc.l_policy_data.\
119                                      l_inodebits.bits == 0)) {
120                                 /*
121                                  * Lock without inodebits makes no sense and
122                                  * will oops later in ldlm. If client miss to
123                                  * set such bits, do not trigger ASSERTION.
124                                  *
125                                  * For liblustre flock case, it maybe zero.
126                                  */
127                                 rc = -EPROTO;
128                         } else {
129                                 tsi->tsi_dlm_req = dlm_req;
130                         }
131                 } else {
132                         rc = -EFAULT;
133                 }
134         }
135
136         if (likely(rc == 0)) {
137                 /*
138                  * Process request, there can be two types of rc:
139                  * 1) errors with msg unpack/pack, other failures outside the
140                  * operation itself. This is counted as serious errors;
141                  * 2) errors during fs operation, should be placed in rq_status
142                  * only
143                  */
144                 rc = h->th_act(tsi);
145                 if (!is_serious(rc) &&
146                     !req->rq_no_reply && req->rq_reply_state == NULL) {
147                         DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
148                                   "pack reply and returned 0 error\n",
149                                   tgt_name(tsi->tsi_tgt), h->th_name);
150                         LBUG();
151                 }
152                 serious = is_serious(rc);
153                 rc = clear_serious(rc);
154         } else {
155                 serious = 1;
156         }
157
158         req->rq_status = rc;
159
160         /*
161          * ELDLM_* codes which > 0 should be in rq_status only as well as
162          * all non-serious errors.
163          */
164         if (rc > 0 || !serious)
165                 rc = 0;
166
167         LASSERT(current->journal_info == NULL);
168
169         /*
170          * If we're DISCONNECTing, the export_data is already freed
171          *
172          * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
173          */
174         if (likely(rc == 0 && req->rq_export))
175                 target_committed_to_req(req);
176
177         target_send_reply(req, rc, tsi->tsi_reply_fail_id);
178         RETURN(0);
179 }
180
181 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
182                                        struct obd_device *obd, int *process)
183 {
184         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
185         case MDS_DISCONNECT:
186         case OST_DISCONNECT:
187         case OBD_IDX_READ:
188                 *process = 1;
189                 RETURN(0);
190         case MDS_CLOSE:
191         case MDS_DONE_WRITING:
192         case MDS_SYNC: /* used in unmounting */
193         case OBD_PING:
194         case MDS_REINT:
195         case UPDATE_OBJ:
196         case SEQ_QUERY:
197         case FLD_QUERY:
198         case LDLM_ENQUEUE:
199                 *process = target_queue_recovery_request(req, obd);
200                 RETURN(0);
201
202         default:
203                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
204                 *process = -EAGAIN;
205                 RETURN(0);
206         }
207 }
208
209 /*
210  * Handle recovery. Return:
211  *        +1: continue request processing;
212  *       -ve: abort immediately with the given error code;
213  *         0: send reply with error code in req->rq_status;
214  */
215 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
216 {
217         ENTRY;
218
219         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
220         case MDS_CONNECT:
221         case OST_CONNECT:
222         case MGS_CONNECT:
223         case SEC_CTX_INIT:
224         case SEC_CTX_INIT_CONT:
225         case SEC_CTX_FINI:
226                 RETURN(+1);
227         }
228
229         if (!req->rq_export->exp_obd->obd_replayable)
230                 RETURN(+1);
231
232         /* sanity check: if the xid matches, the request must be marked as a
233          * resent or replayed */
234         if (req_xid_is_last(req)) {
235                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
236                       (MSG_RESENT | MSG_REPLAY))) {
237                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
238                                   "last_xid, expected REPLAY or RESENT flag "
239                                   "(%x)", req->rq_xid,
240                                   lustre_msg_get_flags(req->rq_reqmsg));
241                         req->rq_status = -ENOTCONN;
242                         RETURN(-ENOTCONN);
243                 }
244         }
245         /* else: note the opposite is not always true; a RESENT req after a
246          * failover will usually not match the last_xid, since it was likely
247          * never committed. A REPLAYed request will almost never match the
248          * last xid, however it could for a committed, but still retained,
249          * open. */
250
251         /* Check for aborted recovery... */
252         if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
253                 int rc;
254                 int should_process;
255
256                 DEBUG_REQ(D_INFO, req, "Got new replay");
257                 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
258                                                  &should_process);
259                 if (rc != 0 || !should_process)
260                         RETURN(rc);
261                 else if (should_process < 0) {
262                         req->rq_status = should_process;
263                         rc = ptlrpc_error(req);
264                         RETURN(rc);
265                 }
266         }
267         RETURN(+1);
268 }
269
270 int tgt_request_handle(struct ptlrpc_request *req)
271 {
272         struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
273
274         struct lustre_msg       *msg = req->rq_reqmsg;
275         struct tgt_handler      *h;
276         struct tgt_opc_slice    *s;
277         struct lu_target        *tgt;
278         int                      request_fail_id = 0;
279         __u32                    opc = lustre_msg_get_opc(msg);
280         int                      rc;
281
282         ENTRY;
283
284         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
285         tsi->tsi_pill = &req->rq_pill;
286         tsi->tsi_env = req->rq_svc_thread->t_env;
287         tsi->tsi_dlm_req = NULL;
288
289         /* if request has export then get handlers slice from corresponding
290          * target, otherwise that should be connect operation */
291         if (opc == MDS_CONNECT || opc == OST_CONNECT ||
292             opc == MGS_CONNECT) {
293                 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
294                 rc = target_handle_connect(req);
295                 if (rc != 0) {
296                         rc = ptlrpc_error(req);
297                         GOTO(out, rc);
298                 }
299         }
300
301         if (unlikely(!class_connected_export(req->rq_export))) {
302                 CDEBUG(D_HA, "operation %d on unconnected OST from %s\n",
303                        opc, libcfs_id2str(req->rq_peer));
304                 req->rq_status = -ENOTCONN;
305                 rc = ptlrpc_error(req);
306                 GOTO(out, rc);
307         }
308
309         tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
310         tsi->tsi_exp = req->rq_export;
311
312         request_fail_id = tgt->lut_request_fail_id;
313         tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
314
315         for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
316                 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
317                         break;
318
319         /* opcode was not found in slice */
320         if (unlikely(s->tos_hs == NULL)) {
321                 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
322                 req->rq_status = -ENOTSUPP;
323                 rc = ptlrpc_error(req);
324                 GOTO(out, rc);
325         }
326
327         if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
328                 GOTO(out, rc = 0);
329
330         LASSERT(current->journal_info == NULL);
331
332         LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
333         h = s->tos_hs + (opc - s->tos_opc_start);
334         if (unlikely(h->th_opc == 0)) {
335                 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
336                 req->rq_status = -ENOTSUPP;
337                 rc = ptlrpc_error(req);
338                 GOTO(out, rc);
339         }
340
341         rc = lustre_msg_check_version(msg, h->th_version);
342         if (unlikely(rc)) {
343                 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
344                           " %08x, expecting %08x\n", tgt_name(tgt),
345                           lustre_msg_get_version(msg), h->th_version);
346                 req->rq_status = -EINVAL;
347                 rc = ptlrpc_error(req);
348                 GOTO(out, rc);
349         }
350
351         rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
352         if (likely(rc == 1)) {
353                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
354                          h->th_opc, opc);
355                 rc = tgt_handle_request0(tsi, h, req);
356                 if (rc)
357                         GOTO(out, rc);
358         }
359         EXIT;
360 out:
361         req_capsule_fini(tsi->tsi_pill);
362         tsi->tsi_pill = NULL;
363         return rc;
364 }
365 EXPORT_SYMBOL(tgt_request_handle);
366
367 void tgt_counter_incr(struct obd_export *exp, int opcode)
368 {
369         lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
370         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
371                 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
372 }
373 EXPORT_SYMBOL(tgt_counter_incr);
374
375 /*
376  * Unified target generic handlers.
377  */
378
379 /*
380  * Security functions
381  */
382 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
383 {
384         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
385                                       OBD_CONNECT_RMT_CLIENT_FORCE |
386                                       OBD_CONNECT_MDS_CAPA |
387                                       OBD_CONNECT_OSS_CAPA);
388 }
389
390 static int tgt_init_sec_level(struct ptlrpc_request *req)
391 {
392         struct lu_target        *tgt = class_exp2tgt(req->rq_export);
393         char                    *client = libcfs_nid2str(req->rq_peer.nid);
394         struct obd_connect_data *data, *reply;
395         int                      rc = 0;
396         bool                     remote;
397
398         ENTRY;
399
400         data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
401         reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
402         if (data == NULL || reply == NULL)
403                 RETURN(-EFAULT);
404
405         /* connection from MDT is always trusted */
406         if (req->rq_auth_usr_mdt) {
407                 tgt_init_sec_none(reply);
408                 RETURN(0);
409         }
410
411         /* no GSS support case */
412         if (!req->rq_auth_gss) {
413                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
414                         CWARN("client %s -> target %s does not use GSS, "
415                               "can not run under security level %d.\n",
416                               client, tgt_name(tgt), tgt->lut_sec_level);
417                         RETURN(-EACCES);
418                 } else {
419                         tgt_init_sec_none(reply);
420                         RETURN(0);
421                 }
422         }
423
424         /* old version case */
425         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
426                      !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
427                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
428                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
429                         CWARN("client %s -> target %s uses old version, "
430                               "can not run under security level %d.\n",
431                               client, tgt_name(tgt), tgt->lut_sec_level);
432                         RETURN(-EACCES);
433                 } else {
434                         CWARN("client %s -> target %s uses old version, "
435                               "run under security level %d.\n",
436                               client, tgt_name(tgt), tgt->lut_sec_level);
437                         tgt_init_sec_none(reply);
438                         RETURN(0);
439                 }
440         }
441
442         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
443         if (remote) {
444                 if (!req->rq_auth_remote)
445                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
446                                "asked to be remote.\n", client, tgt_name(tgt));
447         } else if (req->rq_auth_remote) {
448                 remote = true;
449                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
450                        "as remote by default.\n", client, tgt_name(tgt));
451         }
452
453         if (remote) {
454                 if (!tgt->lut_oss_capa) {
455                         CDEBUG(D_SEC,
456                                "client %s -> target %s is set as remote,"
457                                " but OSS capabilities are not enabled: %d.\n",
458                                client, tgt_name(tgt), tgt->lut_oss_capa);
459                         RETURN(-EACCES);
460                 }
461         } else {
462                 if (req->rq_auth_uid == INVALID_UID) {
463                         CDEBUG(D_SEC, "client %s -> target %s: user is not "
464                                "authenticated!\n", client, tgt_name(tgt));
465                         RETURN(-EACCES);
466                 }
467         }
468
469
470         switch (tgt->lut_sec_level) {
471         case LUSTRE_SEC_NONE:
472                 if (remote) {
473                         CDEBUG(D_SEC,
474                                "client %s -> target %s is set as remote, "
475                                "can not run under security level %d.\n",
476                                client, tgt_name(tgt), tgt->lut_sec_level);
477                         RETURN(-EACCES);
478                 }
479                 tgt_init_sec_none(reply);
480                 break;
481         case LUSTRE_SEC_REMOTE:
482                 if (!remote)
483                         tgt_init_sec_none(reply);
484                 break;
485         case LUSTRE_SEC_ALL:
486                 if (remote)
487                         break;
488                 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
489                                               OBD_CONNECT_RMT_CLIENT_FORCE);
490                 if (!tgt->lut_oss_capa)
491                         reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
492                 if (!tgt->lut_mds_capa)
493                         reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
494                 break;
495         default:
496                 RETURN(-EINVAL);
497         }
498
499         RETURN(rc);
500 }
501
502 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
503 {
504         struct lu_target        *tgt = class_exp2tgt(exp);
505         struct sptlrpc_flavor    flvr;
506         int                      rc = 0;
507
508         LASSERT(tgt);
509         LASSERT(tgt->lut_obd);
510         LASSERT(tgt->lut_slice);
511
512         /* always allow ECHO client */
513         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
514                             LUSTRE_ECHO_NAME) == 0)) {
515                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
516                 return 0;
517         }
518
519         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
520                 read_lock(&tgt->lut_sptlrpc_lock);
521                 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
522                                              req->rq_sp_from,
523                                              req->rq_peer.nid,
524                                              &flvr);
525                 read_unlock(&tgt->lut_sptlrpc_lock);
526
527                 spin_lock(&exp->exp_lock);
528                 exp->exp_sp_peer = req->rq_sp_from;
529                 exp->exp_flvr = flvr;
530                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
531                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
532                         CERROR("%s: unauthorized rpc flavor %x from %s, "
533                                "expect %x\n", tgt_name(tgt),
534                                req->rq_flvr.sf_rpc,
535                                libcfs_nid2str(req->rq_peer.nid),
536                                exp->exp_flvr.sf_rpc);
537                         rc = -EACCES;
538                 }
539                 spin_unlock(&exp->exp_lock);
540         } else {
541                 if (exp->exp_sp_peer != req->rq_sp_from) {
542                         CERROR("%s: RPC source %s doesn't match %s\n",
543                                tgt_name(tgt),
544                                sptlrpc_part2name(req->rq_sp_from),
545                                sptlrpc_part2name(exp->exp_sp_peer));
546                         rc = -EACCES;
547                 } else {
548                         rc = sptlrpc_target_export_check(exp, req);
549                 }
550         }
551
552         return rc;
553 }
554
555 int tgt_connect(struct tgt_session_info *tsi)
556 {
557         struct ptlrpc_request   *req = tgt_ses_req(tsi);
558         struct obd_connect_data *reply;
559         int                      rc;
560
561         ENTRY;
562
563         rc = tgt_init_sec_level(req);
564         if (rc != 0)
565                 GOTO(out, rc);
566
567         /* XXX: better to call this check right after getting new export but
568          * before last_rcvd slot allocation to avoid server load upon insecure
569          * connects. This is to be fixed after unifiyng all targets.
570          */
571         rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
572         if (rc)
573                 GOTO(out, rc);
574
575         /* To avoid exposing partially initialized connection flags, changes up
576          * to this point have been staged in reply->ocd_connect_flags. Now that
577          * connection handling has completed successfully, atomically update
578          * the connect flags in the shared export data structure. LU-1623 */
579         reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
580         spin_lock(&tsi->tsi_exp->exp_lock);
581         *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
582         spin_unlock(&tsi->tsi_exp->exp_lock);
583
584         RETURN(0);
585 out:
586         obd_disconnect(class_export_get(tsi->tsi_exp));
587         return rc;
588 }
589 EXPORT_SYMBOL(tgt_connect);
590
591 int tgt_disconnect(struct tgt_session_info *tsi)
592 {
593         int rc;
594
595         ENTRY;
596
597         rc = target_handle_disconnect(tgt_ses_req(tsi));
598         if (rc)
599                 RETURN(err_serious(rc));
600
601         RETURN(rc);
602 }
603 EXPORT_SYMBOL(tgt_disconnect);
604
605 /*
606  * Unified target OBD handlers
607  */
608 int tgt_obd_ping(struct tgt_session_info *tsi)
609 {
610         int rc;
611
612         ENTRY;
613
614         rc = target_handle_ping(tgt_ses_req(tsi));
615         if (rc)
616                 RETURN(err_serious(rc));
617
618         RETURN(rc);
619 }
620 EXPORT_SYMBOL(tgt_obd_ping);
621
622 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
623 {
624         return err_serious(-EOPNOTSUPP);
625 }
626 EXPORT_SYMBOL(tgt_obd_log_cancel);
627
628 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
629 {
630         return err_serious(-EOPNOTSUPP);
631 }
632 EXPORT_SYMBOL(tgt_obd_qc_callback);
633
634 static int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg,
635                         int nob)
636 {
637         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
638         struct ptlrpc_request   *req = tgt_ses_req(tsi);
639         struct obd_export       *exp = req->rq_export;
640         struct ptlrpc_bulk_desc *desc;
641         struct l_wait_info      *lwi = &tti->tti_u.rdpg.tti_wait_info;
642         int                      tmpcount;
643         int                      tmpsize;
644         int                      i;
645         int                      rc;
646
647         ENTRY;
648
649         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
650                                     MDS_BULK_PORTAL);
651         if (desc == NULL)
652                 RETURN(-ENOMEM);
653
654         if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
655                 /* old client requires reply size in it's PAGE_CACHE_SIZE,
656                  * which is rdpg->rp_count */
657                 nob = rdpg->rp_count;
658
659         for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
660              i++, tmpcount -= tmpsize) {
661                 tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
662                 ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
663         }
664
665         LASSERT(desc->bd_nob == nob);
666         rc = target_bulk_io(exp, desc, lwi);
667         ptlrpc_free_bulk_pin(desc);
668         RETURN(rc);
669 }
670 EXPORT_SYMBOL(tgt_sendpage);
671
672 /*
673  * OBD_IDX_READ handler
674  */
675 int tgt_obd_idx_read(struct tgt_session_info *tsi)
676 {
677         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
678         struct lu_rdpg          *rdpg = &tti->tti_u.rdpg.tti_rdpg;
679         struct idx_info         *req_ii, *rep_ii;
680         int                      rc, i;
681
682         ENTRY;
683
684         memset(rdpg, 0, sizeof(*rdpg));
685         req_capsule_set(tsi->tsi_pill, &RQF_OBD_IDX_READ);
686
687         /* extract idx_info buffer from request & reply */
688         req_ii = req_capsule_client_get(tsi->tsi_pill, &RMF_IDX_INFO);
689         if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
690                 RETURN(err_serious(-EPROTO));
691
692         rc = req_capsule_server_pack(tsi->tsi_pill);
693         if (rc)
694                 RETURN(err_serious(rc));
695
696         rep_ii = req_capsule_server_get(tsi->tsi_pill, &RMF_IDX_INFO);
697         if (rep_ii == NULL)
698                 RETURN(err_serious(-EFAULT));
699         rep_ii->ii_magic = IDX_INFO_MAGIC;
700
701         /* extract hash to start with */
702         rdpg->rp_hash = req_ii->ii_hash_start;
703
704         /* extract requested attributes */
705         rdpg->rp_attrs = req_ii->ii_attrs;
706
707         /* check that fid packed in request is valid and supported */
708         if (!fid_is_sane(&req_ii->ii_fid))
709                 RETURN(-EINVAL);
710         rep_ii->ii_fid = req_ii->ii_fid;
711
712         /* copy flags */
713         rep_ii->ii_flags = req_ii->ii_flags;
714
715         /* compute number of pages to allocate, ii_count is the number of 4KB
716          * containers */
717         if (req_ii->ii_count <= 0)
718                 GOTO(out, rc = -EFAULT);
719         rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
720                                exp_max_brw_size(tsi->tsi_exp));
721         rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE -1) >> PAGE_CACHE_SHIFT;
722
723         /* allocate pages to store the containers */
724         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
725         if (rdpg->rp_pages == NULL)
726                 GOTO(out, rc = -ENOMEM);
727         for (i = 0; i < rdpg->rp_npages; i++) {
728                 rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
729                 if (rdpg->rp_pages[i] == NULL)
730                         GOTO(out, rc = -ENOMEM);
731         }
732
733         /* populate pages with key/record pairs */
734         rc = dt_index_read(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, rep_ii, rdpg);
735         if (rc < 0)
736                 GOTO(out, rc);
737
738         LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
739                  "asked %d > %d\n", rc, rdpg->rp_count);
740
741         /* send pages to client */
742         rc = tgt_sendpage(tsi, rdpg, rc);
743         if (rc)
744                 GOTO(out, rc);
745         EXIT;
746 out:
747         if (rdpg->rp_pages) {
748                 for (i = 0; i < rdpg->rp_npages; i++)
749                         if (rdpg->rp_pages[i])
750                                 __free_page(rdpg->rp_pages[i]);
751                 OBD_FREE(rdpg->rp_pages,
752                          rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
753         }
754         return rc;
755 }
756 EXPORT_SYMBOL(tgt_obd_idx_read);
757
758 struct tgt_handler tgt_obd_handlers[] = {
759 TGT_OBD_HDL    (0,      OBD_PING,               tgt_obd_ping),
760 TGT_OBD_HDL_VAR(0,      OBD_LOG_CANCEL,         tgt_obd_log_cancel),
761 TGT_OBD_HDL_VAR(0,      OBD_QC_CALLBACK,        tgt_obd_qc_callback),
762 TGT_OBD_HDL    (0,      OBD_IDX_READ,           tgt_obd_idx_read)
763 };
764 EXPORT_SYMBOL(tgt_obd_handlers);
765
766 /*
767  * Unified target DLM handlers.
768  */
769 struct ldlm_callback_suite tgt_dlm_cbs = {
770         .lcs_completion = ldlm_server_completion_ast,
771         .lcs_blocking   = ldlm_server_blocking_ast,
772         .lcs_glimpse    = ldlm_server_glimpse_ast
773 };
774
775 int tgt_enqueue(struct tgt_session_info *tsi)
776 {
777         struct ptlrpc_request *req = tgt_ses_req(tsi);
778         int rc;
779
780         ENTRY;
781         /*
782          * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
783          * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
784          */
785         LASSERT(tsi->tsi_dlm_req != NULL);
786
787         rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
788                                   tsi->tsi_dlm_req, &tgt_dlm_cbs);
789         if (rc)
790                 RETURN(err_serious(rc));
791
792         RETURN(req->rq_status);
793 }
794 EXPORT_SYMBOL(tgt_enqueue);
795
796 int tgt_convert(struct tgt_session_info *tsi)
797 {
798         struct ptlrpc_request *req = tgt_ses_req(tsi);
799         int rc;
800
801         ENTRY;
802         LASSERT(tsi->tsi_dlm_req);
803         rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
804         if (rc)
805                 RETURN(err_serious(rc));
806
807         RETURN(req->rq_status);
808 }
809 EXPORT_SYMBOL(tgt_convert);
810
811 int tgt_bl_callback(struct tgt_session_info *tsi)
812 {
813         return err_serious(-EOPNOTSUPP);
814 }
815 EXPORT_SYMBOL(tgt_bl_callback);
816
817 int tgt_cp_callback(struct tgt_session_info *tsi)
818 {
819         return err_serious(-EOPNOTSUPP);
820 }
821 EXPORT_SYMBOL(tgt_cp_callback);
822
823 /* generic LDLM target handler */
824 struct tgt_handler tgt_dlm_handlers[] = {
825 TGT_DLM_HDL    (HABEO_CLAVIS,   LDLM_ENQUEUE,           tgt_enqueue),
826 TGT_DLM_HDL_VAR(HABEO_CLAVIS,   LDLM_CONVERT,           tgt_convert),
827 TGT_DLM_HDL_VAR(0,              LDLM_BL_CALLBACK,       tgt_bl_callback),
828 TGT_DLM_HDL_VAR(0,              LDLM_CP_CALLBACK,       tgt_cp_callback)
829 };
830 EXPORT_SYMBOL(tgt_dlm_handlers);
831
832 /*
833  * Unified target LLOG handlers.
834  */
835 int tgt_llog_open(struct tgt_session_info *tsi)
836 {
837         int rc;
838
839         ENTRY;
840
841         rc = llog_origin_handle_open(tgt_ses_req(tsi));
842
843         RETURN(rc);
844 }
845 EXPORT_SYMBOL(tgt_llog_open);
846
847 int tgt_llog_close(struct tgt_session_info *tsi)
848 {
849         int rc;
850
851         ENTRY;
852
853         rc = llog_origin_handle_close(tgt_ses_req(tsi));
854
855         RETURN(rc);
856 }
857 EXPORT_SYMBOL(tgt_llog_close);
858
859
860 int tgt_llog_destroy(struct tgt_session_info *tsi)
861 {
862         int rc;
863
864         ENTRY;
865
866         rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
867
868         RETURN(rc);
869 }
870 EXPORT_SYMBOL(tgt_llog_destroy);
871
872 int tgt_llog_read_header(struct tgt_session_info *tsi)
873 {
874         int rc;
875
876         ENTRY;
877
878         rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
879
880         RETURN(rc);
881 }
882 EXPORT_SYMBOL(tgt_llog_read_header);
883
884 int tgt_llog_next_block(struct tgt_session_info *tsi)
885 {
886         int rc;
887
888         ENTRY;
889
890         rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
891
892         RETURN(rc);
893 }
894 EXPORT_SYMBOL(tgt_llog_next_block);
895
896 int tgt_llog_prev_block(struct tgt_session_info *tsi)
897 {
898         int rc;
899
900         ENTRY;
901
902         rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
903
904         RETURN(rc);
905 }
906 EXPORT_SYMBOL(tgt_llog_prev_block);
907
908 /* generic llog target handler */
909 struct tgt_handler tgt_llog_handlers[] = {
910 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_CREATE,      tgt_llog_open),
911 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
912 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
913 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
914 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_DESTROY,     tgt_llog_destroy),
915 TGT_LLOG_HDL_VAR(0,     LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
916 };
917 EXPORT_SYMBOL(tgt_llog_handlers);
918
919 /*
920  * sec context handlers
921  */
922 /* XXX: Implement based on mdt_sec_ctx_handle()? */
923 int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
924 {
925         return 0;
926 }
927
928 struct tgt_handler tgt_sec_ctx_handlers[] = {
929 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT,           tgt_sec_ctx_handle),
930 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT_CONT,      tgt_sec_ctx_handle),
931 TGT_SEC_HDL_VAR(0,      SEC_CTX_FINI,           tgt_sec_ctx_handle),
932 };
933 EXPORT_SYMBOL(tgt_sec_ctx_handlers);