Whamcloud - gitweb
LU-3467 mdt: call MDT handlers via unified request handler
[fs/lustre-release.git] / lustre / target / tgt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2011, 2012, Intel Corporation.
25  */
26 /*
27  * lustre/target/tgt_handler.c
28  *
29  * Lustre Unified Target request handler code
30  *
31  * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32  * Author: Mikhail Pershin <mike.pershin@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_CLASS
36
37 #include <obd.h>
38 #include <obd_class.h>
39
40 #include "tgt_internal.h"
41
42 char *tgt_name(struct lu_target *tgt)
43 {
44         LASSERT(tgt->lut_obd != NULL);
45         return tgt->lut_obd->obd_name;
46 }
47 EXPORT_SYMBOL(tgt_name);
48
49 /*
50  * Generic code handling requests that have struct mdt_body passed in:
51  *
52  *  - extract mdt_body from request and save it in @tsi, if present;
53  *
54  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
55  *  @tsi;
56  *
57  *  - if HABEO_CORPUS flag is set for this request type check whether object
58  *  actually exists on storage (lu_object_exists()).
59  *
60  */
61 static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
62 {
63         const struct mdt_body   *body;
64         struct lu_object        *obj;
65         struct req_capsule      *pill = tsi->tsi_pill;
66         int                      rc;
67
68         ENTRY;
69
70         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
71         if (body == NULL)
72                 RETURN(-EFAULT);
73
74         tsi->tsi_mdt_body = body;
75
76         if (!(body->valid & OBD_MD_FLID))
77                 RETURN(0);
78
79         /* mdc_pack_body() doesn't check if fid is zero and set OBD_ML_FID
80          * in any case in pre-2.5 clients. Fix that here if needed */
81         if (unlikely(fid_is_zero(&body->fid1)))
82                 RETURN(0);
83
84         if (!fid_is_sane(&body->fid1)) {
85                 CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
86                        PFID(&body->fid1));
87                 RETURN(-EINVAL);
88         }
89
90         obj = lu_object_find(tsi->tsi_env,
91                              &tsi->tsi_tgt->lut_bottom->dd_lu_dev,
92                              &body->fid1, NULL);
93         if (!IS_ERR(obj)) {
94                 if ((flags & HABEO_CORPUS) && !lu_object_exists(obj)) {
95                         lu_object_put(tsi->tsi_env, obj);
96                         /* for capability renew ENOENT will be handled in
97                          * mdt_renew_capa */
98                         if (body->valid & OBD_MD_FLOSSCAPA)
99                                 rc = 0;
100                         else
101                                 rc = -ENOENT;
102                 } else {
103                         tsi->tsi_corpus = obj;
104                         rc = 0;
105                 }
106         } else {
107                 rc = PTR_ERR(obj);
108         }
109         RETURN(rc);
110 }
111
112 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
113 {
114         struct req_capsule      *pill = tsi->tsi_pill;
115         int                      rc;
116
117         ENTRY;
118
119         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
120                 rc = tgt_mdt_body_unpack(tsi, flags);
121         } else {
122                 rc = 0;
123         }
124
125         if (flags & HABEO_REFERO) {
126                 /* Pack reply */
127                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
128                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
129                                              tsi->tsi_mdt_body->eadatasize);
130                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
131                         req_capsule_set_size(pill, &RMF_LOGCOOKIES,
132                                              RCL_SERVER, 0);
133
134                 rc = req_capsule_server_pack(pill);
135         }
136         RETURN(rc);
137 }
138
139 /*
140  * Invoke handler for this request opc. Also do necessary preprocessing
141  * (according to handler ->th_flags), and post-processing (setting of
142  * ->last_{xid,committed}).
143  */
144 static int tgt_handle_request0(struct tgt_session_info *tsi,
145                                struct tgt_handler *h,
146                                struct ptlrpc_request *req)
147 {
148         int      serious = 0;
149         int      rc;
150         __u32    flags;
151
152         ENTRY;
153
154         LASSERT(h->th_act != NULL);
155         LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
156         LASSERT(current->journal_info == NULL);
157
158         /*
159          * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
160          * to put same checks into handlers like mdt_close(), mdt_reint(),
161          * etc., without talking to mdt authors first. Checking same thing
162          * there again is useless and returning 0 error without packing reply
163          * is buggy! Handlers either pack reply or return error.
164          *
165          * We return 0 here and do not send any reply in order to emulate
166          * network failure. Do not send any reply in case any of NET related
167          * fail_id has occured.
168          */
169         if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
170                 RETURN(0);
171
172         rc = 0;
173         flags = h->th_flags;
174         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
175                      h->th_fmt != NULL));
176         if (h->th_fmt != NULL) {
177                 req_capsule_set(tsi->tsi_pill, h->th_fmt);
178                 rc = tgt_unpack_req_pack_rep(tsi, flags);
179         }
180
181         if (rc == 0 && flags & MUTABOR &&
182             tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
183                 rc = -EROFS;
184
185         if (rc == 0 && flags & HABEO_CLAVIS) {
186                 struct ldlm_request *dlm_req;
187
188                 LASSERT(h->th_fmt != NULL);
189
190                 dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
191                 if (dlm_req != NULL) {
192                         if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
193                                      LDLM_IBITS &&
194                                      dlm_req->lock_desc.l_policy_data.\
195                                      l_inodebits.bits == 0)) {
196                                 /*
197                                  * Lock without inodebits makes no sense and
198                                  * will oops later in ldlm. If client miss to
199                                  * set such bits, do not trigger ASSERTION.
200                                  *
201                                  * For liblustre flock case, it maybe zero.
202                                  */
203                                 rc = -EPROTO;
204                         } else {
205                                 tsi->tsi_dlm_req = dlm_req;
206                         }
207                 } else {
208                         rc = -EFAULT;
209                 }
210         }
211
212         if (likely(rc == 0)) {
213                 /*
214                  * Process request, there can be two types of rc:
215                  * 1) errors with msg unpack/pack, other failures outside the
216                  * operation itself. This is counted as serious errors;
217                  * 2) errors during fs operation, should be placed in rq_status
218                  * only
219                  */
220                 rc = h->th_act(tsi);
221                 if (!is_serious(rc) &&
222                     !req->rq_no_reply && req->rq_reply_state == NULL) {
223                         DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
224                                   "pack reply and returned 0 error\n",
225                                   tgt_name(tsi->tsi_tgt), h->th_name);
226                         LBUG();
227                 }
228                 serious = is_serious(rc);
229                 rc = clear_serious(rc);
230         } else {
231                 serious = 1;
232         }
233
234         req->rq_status = rc;
235
236         /*
237          * ELDLM_* codes which > 0 should be in rq_status only as well as
238          * all non-serious errors.
239          */
240         if (rc > 0 || !serious)
241                 rc = 0;
242
243         LASSERT(current->journal_info == NULL);
244
245         /*
246          * If we're DISCONNECTing, the export_data is already freed
247          *
248          * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
249          */
250         if (likely(rc == 0 && req->rq_export))
251                 target_committed_to_req(req);
252
253         target_send_reply(req, rc, tsi->tsi_reply_fail_id);
254         RETURN(0);
255 }
256
257 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
258                                        struct obd_device *obd, int *process)
259 {
260         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
261         case MDS_DISCONNECT:
262         case OST_DISCONNECT:
263         case OBD_IDX_READ:
264                 *process = 1;
265                 RETURN(0);
266         case MDS_CLOSE:
267         case MDS_DONE_WRITING:
268         case MDS_SYNC: /* used in unmounting */
269         case OBD_PING:
270         case MDS_REINT:
271         case UPDATE_OBJ:
272         case SEQ_QUERY:
273         case FLD_QUERY:
274         case LDLM_ENQUEUE:
275                 *process = target_queue_recovery_request(req, obd);
276                 RETURN(0);
277
278         default:
279                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
280                 *process = -EAGAIN;
281                 RETURN(0);
282         }
283 }
284
285 /*
286  * Handle recovery. Return:
287  *        +1: continue request processing;
288  *       -ve: abort immediately with the given error code;
289  *         0: send reply with error code in req->rq_status;
290  */
291 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
292 {
293         ENTRY;
294
295         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
296         case MDS_CONNECT:
297         case OST_CONNECT:
298         case MGS_CONNECT:
299         case SEC_CTX_INIT:
300         case SEC_CTX_INIT_CONT:
301         case SEC_CTX_FINI:
302                 RETURN(+1);
303         }
304
305         if (!req->rq_export->exp_obd->obd_replayable)
306                 RETURN(+1);
307
308         /* sanity check: if the xid matches, the request must be marked as a
309          * resent or replayed */
310         if (req_xid_is_last(req)) {
311                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
312                       (MSG_RESENT | MSG_REPLAY))) {
313                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
314                                   "last_xid, expected REPLAY or RESENT flag "
315                                   "(%x)", req->rq_xid,
316                                   lustre_msg_get_flags(req->rq_reqmsg));
317                         req->rq_status = -ENOTCONN;
318                         RETURN(-ENOTCONN);
319                 }
320         }
321         /* else: note the opposite is not always true; a RESENT req after a
322          * failover will usually not match the last_xid, since it was likely
323          * never committed. A REPLAYed request will almost never match the
324          * last xid, however it could for a committed, but still retained,
325          * open. */
326
327         /* Check for aborted recovery... */
328         if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
329                 int rc;
330                 int should_process;
331
332                 DEBUG_REQ(D_INFO, req, "Got new replay");
333                 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
334                                                  &should_process);
335                 if (rc != 0 || !should_process)
336                         RETURN(rc);
337                 else if (should_process < 0) {
338                         req->rq_status = should_process;
339                         rc = ptlrpc_error(req);
340                         RETURN(rc);
341                 }
342         }
343         RETURN(+1);
344 }
345
346 int tgt_request_handle(struct ptlrpc_request *req)
347 {
348         struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
349
350         struct lustre_msg       *msg = req->rq_reqmsg;
351         struct tgt_handler      *h;
352         struct tgt_opc_slice    *s;
353         struct lu_target        *tgt;
354         int                      request_fail_id = 0;
355         __u32                    opc = lustre_msg_get_opc(msg);
356         int                      rc;
357
358         ENTRY;
359
360         /* Refill the context, to make sure all thread keys are allocated */
361         lu_env_refill(req->rq_svc_thread->t_env);
362
363         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
364         tsi->tsi_pill = &req->rq_pill;
365         tsi->tsi_env = req->rq_svc_thread->t_env;
366
367         /* if request has export then get handlers slice from corresponding
368          * target, otherwise that should be connect operation */
369         if (opc == MDS_CONNECT || opc == OST_CONNECT ||
370             opc == MGS_CONNECT) {
371                 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
372                 rc = target_handle_connect(req);
373                 if (rc != 0) {
374                         rc = ptlrpc_error(req);
375                         GOTO(out, rc);
376                 }
377         }
378
379         if (unlikely(!class_connected_export(req->rq_export))) {
380                 CDEBUG(D_HA, "operation %d on unconnected OST from %s\n",
381                        opc, libcfs_id2str(req->rq_peer));
382                 req->rq_status = -ENOTCONN;
383                 rc = ptlrpc_error(req);
384                 GOTO(out, rc);
385         }
386
387         tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
388         tsi->tsi_exp = req->rq_export;
389
390         request_fail_id = tgt->lut_request_fail_id;
391         tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
392
393         for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
394                 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
395                         break;
396
397         /* opcode was not found in slice */
398         if (unlikely(s->tos_hs == NULL)) {
399                 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
400                 req->rq_status = -ENOTSUPP;
401                 rc = ptlrpc_error(req);
402                 GOTO(out, rc);
403         }
404
405         if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
406                 GOTO(out, rc = 0);
407
408         LASSERT(current->journal_info == NULL);
409
410         LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
411         h = s->tos_hs + (opc - s->tos_opc_start);
412         if (unlikely(h->th_opc == 0)) {
413                 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
414                 req->rq_status = -ENOTSUPP;
415                 rc = ptlrpc_error(req);
416                 GOTO(out, rc);
417         }
418
419         rc = lustre_msg_check_version(msg, h->th_version);
420         if (unlikely(rc)) {
421                 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
422                           " %08x, expecting %08x\n", tgt_name(tgt),
423                           lustre_msg_get_version(msg), h->th_version);
424                 req->rq_status = -EINVAL;
425                 rc = ptlrpc_error(req);
426                 GOTO(out, rc);
427         }
428
429         rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
430         if (likely(rc == 1)) {
431                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
432                          h->th_opc, opc);
433                 rc = tgt_handle_request0(tsi, h, req);
434                 if (rc)
435                         GOTO(out, rc);
436         }
437         EXIT;
438 out:
439         req_capsule_fini(tsi->tsi_pill);
440         tsi->tsi_pill = NULL;
441         if (tsi->tsi_corpus != NULL) {
442                 lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
443                 tsi->tsi_corpus = NULL;
444         }
445         tsi->tsi_env = NULL;
446         tsi->tsi_mdt_body = NULL;
447         tsi->tsi_dlm_req = NULL;
448         return rc;
449 }
450 EXPORT_SYMBOL(tgt_request_handle);
451
452 void tgt_counter_incr(struct obd_export *exp, int opcode)
453 {
454         lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
455         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
456                 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
457 }
458 EXPORT_SYMBOL(tgt_counter_incr);
459
460 /*
461  * Unified target generic handlers.
462  */
463
464 /*
465  * Security functions
466  */
467 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
468 {
469         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
470                                       OBD_CONNECT_RMT_CLIENT_FORCE |
471                                       OBD_CONNECT_MDS_CAPA |
472                                       OBD_CONNECT_OSS_CAPA);
473 }
474
475 static int tgt_init_sec_level(struct ptlrpc_request *req)
476 {
477         struct lu_target        *tgt = class_exp2tgt(req->rq_export);
478         char                    *client = libcfs_nid2str(req->rq_peer.nid);
479         struct obd_connect_data *data, *reply;
480         int                      rc = 0;
481         bool                     remote;
482
483         ENTRY;
484
485         data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
486         reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
487         if (data == NULL || reply == NULL)
488                 RETURN(-EFAULT);
489
490         /* connection from MDT is always trusted */
491         if (req->rq_auth_usr_mdt) {
492                 tgt_init_sec_none(reply);
493                 RETURN(0);
494         }
495
496         /* no GSS support case */
497         if (!req->rq_auth_gss) {
498                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
499                         CWARN("client %s -> target %s does not use GSS, "
500                               "can not run under security level %d.\n",
501                               client, tgt_name(tgt), tgt->lut_sec_level);
502                         RETURN(-EACCES);
503                 } else {
504                         tgt_init_sec_none(reply);
505                         RETURN(0);
506                 }
507         }
508
509         /* old version case */
510         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
511                      !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
512                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
513                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
514                         CWARN("client %s -> target %s uses old version, "
515                               "can not run under security level %d.\n",
516                               client, tgt_name(tgt), tgt->lut_sec_level);
517                         RETURN(-EACCES);
518                 } else {
519                         CWARN("client %s -> target %s uses old version, "
520                               "run under security level %d.\n",
521                               client, tgt_name(tgt), tgt->lut_sec_level);
522                         tgt_init_sec_none(reply);
523                         RETURN(0);
524                 }
525         }
526
527         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
528         if (remote) {
529                 if (!req->rq_auth_remote)
530                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
531                                "asked to be remote.\n", client, tgt_name(tgt));
532         } else if (req->rq_auth_remote) {
533                 remote = true;
534                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
535                        "as remote by default.\n", client, tgt_name(tgt));
536         }
537
538         if (remote) {
539                 if (!tgt->lut_oss_capa) {
540                         CDEBUG(D_SEC,
541                                "client %s -> target %s is set as remote,"
542                                " but OSS capabilities are not enabled: %d.\n",
543                                client, tgt_name(tgt), tgt->lut_oss_capa);
544                         RETURN(-EACCES);
545                 }
546         } else {
547                 if (req->rq_auth_uid == INVALID_UID) {
548                         CDEBUG(D_SEC, "client %s -> target %s: user is not "
549                                "authenticated!\n", client, tgt_name(tgt));
550                         RETURN(-EACCES);
551                 }
552         }
553
554
555         switch (tgt->lut_sec_level) {
556         case LUSTRE_SEC_NONE:
557                 if (remote) {
558                         CDEBUG(D_SEC,
559                                "client %s -> target %s is set as remote, "
560                                "can not run under security level %d.\n",
561                                client, tgt_name(tgt), tgt->lut_sec_level);
562                         RETURN(-EACCES);
563                 }
564                 tgt_init_sec_none(reply);
565                 break;
566         case LUSTRE_SEC_REMOTE:
567                 if (!remote)
568                         tgt_init_sec_none(reply);
569                 break;
570         case LUSTRE_SEC_ALL:
571                 if (remote)
572                         break;
573                 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
574                                               OBD_CONNECT_RMT_CLIENT_FORCE);
575                 if (!tgt->lut_oss_capa)
576                         reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
577                 if (!tgt->lut_mds_capa)
578                         reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
579                 break;
580         default:
581                 RETURN(-EINVAL);
582         }
583
584         RETURN(rc);
585 }
586
587 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
588 {
589         struct lu_target        *tgt = class_exp2tgt(exp);
590         struct sptlrpc_flavor    flvr;
591         int                      rc = 0;
592
593         LASSERT(tgt);
594         LASSERT(tgt->lut_obd);
595         LASSERT(tgt->lut_slice);
596
597         /* always allow ECHO client */
598         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
599                             LUSTRE_ECHO_NAME) == 0)) {
600                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
601                 return 0;
602         }
603
604         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
605                 read_lock(&tgt->lut_sptlrpc_lock);
606                 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
607                                              req->rq_sp_from,
608                                              req->rq_peer.nid,
609                                              &flvr);
610                 read_unlock(&tgt->lut_sptlrpc_lock);
611
612                 spin_lock(&exp->exp_lock);
613                 exp->exp_sp_peer = req->rq_sp_from;
614                 exp->exp_flvr = flvr;
615                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
616                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
617                         CERROR("%s: unauthorized rpc flavor %x from %s, "
618                                "expect %x\n", tgt_name(tgt),
619                                req->rq_flvr.sf_rpc,
620                                libcfs_nid2str(req->rq_peer.nid),
621                                exp->exp_flvr.sf_rpc);
622                         rc = -EACCES;
623                 }
624                 spin_unlock(&exp->exp_lock);
625         } else {
626                 if (exp->exp_sp_peer != req->rq_sp_from) {
627                         CERROR("%s: RPC source %s doesn't match %s\n",
628                                tgt_name(tgt),
629                                sptlrpc_part2name(req->rq_sp_from),
630                                sptlrpc_part2name(exp->exp_sp_peer));
631                         rc = -EACCES;
632                 } else {
633                         rc = sptlrpc_target_export_check(exp, req);
634                 }
635         }
636
637         return rc;
638 }
639
640 int tgt_connect(struct tgt_session_info *tsi)
641 {
642         struct ptlrpc_request   *req = tgt_ses_req(tsi);
643         struct obd_connect_data *reply;
644         int                      rc;
645
646         ENTRY;
647
648         rc = tgt_init_sec_level(req);
649         if (rc != 0)
650                 GOTO(out, rc);
651
652         /* XXX: better to call this check right after getting new export but
653          * before last_rcvd slot allocation to avoid server load upon insecure
654          * connects. This is to be fixed after unifiyng all targets.
655          */
656         rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
657         if (rc)
658                 GOTO(out, rc);
659
660         /* To avoid exposing partially initialized connection flags, changes up
661          * to this point have been staged in reply->ocd_connect_flags. Now that
662          * connection handling has completed successfully, atomically update
663          * the connect flags in the shared export data structure. LU-1623 */
664         reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
665         spin_lock(&tsi->tsi_exp->exp_lock);
666         *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
667         tsi->tsi_exp->exp_connect_data.ocd_brw_size = reply->ocd_brw_size;
668         spin_unlock(&tsi->tsi_exp->exp_lock);
669
670         RETURN(0);
671 out:
672         obd_disconnect(class_export_get(tsi->tsi_exp));
673         return rc;
674 }
675 EXPORT_SYMBOL(tgt_connect);
676
677 int tgt_disconnect(struct tgt_session_info *tsi)
678 {
679         int rc;
680
681         ENTRY;
682
683         rc = target_handle_disconnect(tgt_ses_req(tsi));
684         if (rc)
685                 RETURN(err_serious(rc));
686
687         RETURN(rc);
688 }
689 EXPORT_SYMBOL(tgt_disconnect);
690
691 /*
692  * Unified target OBD handlers
693  */
694 int tgt_obd_ping(struct tgt_session_info *tsi)
695 {
696         int rc;
697
698         ENTRY;
699
700         rc = target_handle_ping(tgt_ses_req(tsi));
701         if (rc)
702                 RETURN(err_serious(rc));
703
704         RETURN(rc);
705 }
706 EXPORT_SYMBOL(tgt_obd_ping);
707
708 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
709 {
710         return err_serious(-EOPNOTSUPP);
711 }
712 EXPORT_SYMBOL(tgt_obd_log_cancel);
713
714 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
715 {
716         return err_serious(-EOPNOTSUPP);
717 }
718 EXPORT_SYMBOL(tgt_obd_qc_callback);
719
720 int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
721 {
722         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
723         struct ptlrpc_request   *req = tgt_ses_req(tsi);
724         struct obd_export       *exp = req->rq_export;
725         struct ptlrpc_bulk_desc *desc;
726         struct l_wait_info      *lwi = &tti->tti_u.rdpg.tti_wait_info;
727         int                      tmpcount;
728         int                      tmpsize;
729         int                      i;
730         int                      rc;
731
732         ENTRY;
733
734         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
735                                     MDS_BULK_PORTAL);
736         if (desc == NULL)
737                 RETURN(-ENOMEM);
738
739         if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
740                 /* old client requires reply size in it's PAGE_CACHE_SIZE,
741                  * which is rdpg->rp_count */
742                 nob = rdpg->rp_count;
743
744         for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
745              i++, tmpcount -= tmpsize) {
746                 tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
747                 ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
748         }
749
750         LASSERT(desc->bd_nob == nob);
751         rc = target_bulk_io(exp, desc, lwi);
752         ptlrpc_free_bulk_pin(desc);
753         RETURN(rc);
754 }
755 EXPORT_SYMBOL(tgt_sendpage);
756
757 /*
758  * OBD_IDX_READ handler
759  */
760 int tgt_obd_idx_read(struct tgt_session_info *tsi)
761 {
762         struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
763         struct lu_rdpg          *rdpg = &tti->tti_u.rdpg.tti_rdpg;
764         struct idx_info         *req_ii, *rep_ii;
765         int                      rc, i;
766
767         ENTRY;
768
769         memset(rdpg, 0, sizeof(*rdpg));
770         req_capsule_set(tsi->tsi_pill, &RQF_OBD_IDX_READ);
771
772         /* extract idx_info buffer from request & reply */
773         req_ii = req_capsule_client_get(tsi->tsi_pill, &RMF_IDX_INFO);
774         if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
775                 RETURN(err_serious(-EPROTO));
776
777         rc = req_capsule_server_pack(tsi->tsi_pill);
778         if (rc)
779                 RETURN(err_serious(rc));
780
781         rep_ii = req_capsule_server_get(tsi->tsi_pill, &RMF_IDX_INFO);
782         if (rep_ii == NULL)
783                 RETURN(err_serious(-EFAULT));
784         rep_ii->ii_magic = IDX_INFO_MAGIC;
785
786         /* extract hash to start with */
787         rdpg->rp_hash = req_ii->ii_hash_start;
788
789         /* extract requested attributes */
790         rdpg->rp_attrs = req_ii->ii_attrs;
791
792         /* check that fid packed in request is valid and supported */
793         if (!fid_is_sane(&req_ii->ii_fid))
794                 RETURN(-EINVAL);
795         rep_ii->ii_fid = req_ii->ii_fid;
796
797         /* copy flags */
798         rep_ii->ii_flags = req_ii->ii_flags;
799
800         /* compute number of pages to allocate, ii_count is the number of 4KB
801          * containers */
802         if (req_ii->ii_count <= 0)
803                 GOTO(out, rc = -EFAULT);
804         rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
805                                exp_max_brw_size(tsi->tsi_exp));
806         rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE -1) >> PAGE_CACHE_SHIFT;
807
808         /* allocate pages to store the containers */
809         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
810         if (rdpg->rp_pages == NULL)
811                 GOTO(out, rc = -ENOMEM);
812         for (i = 0; i < rdpg->rp_npages; i++) {
813                 rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
814                 if (rdpg->rp_pages[i] == NULL)
815                         GOTO(out, rc = -ENOMEM);
816         }
817
818         /* populate pages with key/record pairs */
819         rc = dt_index_read(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, rep_ii, rdpg);
820         if (rc < 0)
821                 GOTO(out, rc);
822
823         LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
824                  "asked %d > %d\n", rc, rdpg->rp_count);
825
826         /* send pages to client */
827         rc = tgt_sendpage(tsi, rdpg, rc);
828         if (rc)
829                 GOTO(out, rc);
830         EXIT;
831 out:
832         if (rdpg->rp_pages) {
833                 for (i = 0; i < rdpg->rp_npages; i++)
834                         if (rdpg->rp_pages[i])
835                                 __free_page(rdpg->rp_pages[i]);
836                 OBD_FREE(rdpg->rp_pages,
837                          rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
838         }
839         return rc;
840 }
841 EXPORT_SYMBOL(tgt_obd_idx_read);
842
843 struct tgt_handler tgt_obd_handlers[] = {
844 TGT_OBD_HDL    (0,      OBD_PING,               tgt_obd_ping),
845 TGT_OBD_HDL_VAR(0,      OBD_LOG_CANCEL,         tgt_obd_log_cancel),
846 TGT_OBD_HDL_VAR(0,      OBD_QC_CALLBACK,        tgt_obd_qc_callback),
847 TGT_OBD_HDL    (0,      OBD_IDX_READ,           tgt_obd_idx_read)
848 };
849 EXPORT_SYMBOL(tgt_obd_handlers);
850
851 /*
852  * Unified target DLM handlers.
853  */
854 struct ldlm_callback_suite tgt_dlm_cbs = {
855         .lcs_completion = ldlm_server_completion_ast,
856         .lcs_blocking   = ldlm_server_blocking_ast,
857         .lcs_glimpse    = ldlm_server_glimpse_ast
858 };
859
860 int tgt_enqueue(struct tgt_session_info *tsi)
861 {
862         struct ptlrpc_request *req = tgt_ses_req(tsi);
863         int rc;
864
865         ENTRY;
866         /*
867          * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
868          * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
869          */
870         LASSERT(tsi->tsi_dlm_req != NULL);
871         rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
872                                   tsi->tsi_dlm_req, &tgt_dlm_cbs);
873         if (rc)
874                 RETURN(err_serious(rc));
875
876         RETURN(req->rq_status);
877 }
878 EXPORT_SYMBOL(tgt_enqueue);
879
880 int tgt_convert(struct tgt_session_info *tsi)
881 {
882         struct ptlrpc_request *req = tgt_ses_req(tsi);
883         int rc;
884
885         ENTRY;
886         LASSERT(tsi->tsi_dlm_req);
887         rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
888         if (rc)
889                 RETURN(err_serious(rc));
890
891         RETURN(req->rq_status);
892 }
893 EXPORT_SYMBOL(tgt_convert);
894
895 int tgt_bl_callback(struct tgt_session_info *tsi)
896 {
897         return err_serious(-EOPNOTSUPP);
898 }
899 EXPORT_SYMBOL(tgt_bl_callback);
900
901 int tgt_cp_callback(struct tgt_session_info *tsi)
902 {
903         return err_serious(-EOPNOTSUPP);
904 }
905 EXPORT_SYMBOL(tgt_cp_callback);
906
907 /* generic LDLM target handler */
908 struct tgt_handler tgt_dlm_handlers[] = {
909 TGT_DLM_HDL    (HABEO_CLAVIS,   LDLM_ENQUEUE,           tgt_enqueue),
910 TGT_DLM_HDL_VAR(HABEO_CLAVIS,   LDLM_CONVERT,           tgt_convert),
911 TGT_DLM_HDL_VAR(0,              LDLM_BL_CALLBACK,       tgt_bl_callback),
912 TGT_DLM_HDL_VAR(0,              LDLM_CP_CALLBACK,       tgt_cp_callback)
913 };
914 EXPORT_SYMBOL(tgt_dlm_handlers);
915
916 /*
917  * Unified target LLOG handlers.
918  */
919 int tgt_llog_open(struct tgt_session_info *tsi)
920 {
921         int rc;
922
923         ENTRY;
924
925         rc = llog_origin_handle_open(tgt_ses_req(tsi));
926
927         RETURN(rc);
928 }
929 EXPORT_SYMBOL(tgt_llog_open);
930
931 int tgt_llog_close(struct tgt_session_info *tsi)
932 {
933         int rc;
934
935         ENTRY;
936
937         rc = llog_origin_handle_close(tgt_ses_req(tsi));
938
939         RETURN(rc);
940 }
941 EXPORT_SYMBOL(tgt_llog_close);
942
943
944 int tgt_llog_destroy(struct tgt_session_info *tsi)
945 {
946         int rc;
947
948         ENTRY;
949
950         rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
951
952         RETURN(rc);
953 }
954 EXPORT_SYMBOL(tgt_llog_destroy);
955
956 int tgt_llog_read_header(struct tgt_session_info *tsi)
957 {
958         int rc;
959
960         ENTRY;
961
962         rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
963
964         RETURN(rc);
965 }
966 EXPORT_SYMBOL(tgt_llog_read_header);
967
968 int tgt_llog_next_block(struct tgt_session_info *tsi)
969 {
970         int rc;
971
972         ENTRY;
973
974         rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
975
976         RETURN(rc);
977 }
978 EXPORT_SYMBOL(tgt_llog_next_block);
979
980 int tgt_llog_prev_block(struct tgt_session_info *tsi)
981 {
982         int rc;
983
984         ENTRY;
985
986         rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
987
988         RETURN(rc);
989 }
990 EXPORT_SYMBOL(tgt_llog_prev_block);
991
992 /* generic llog target handler */
993 struct tgt_handler tgt_llog_handlers[] = {
994 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_CREATE,      tgt_llog_open),
995 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
996 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
997 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
998 TGT_LLOG_HDL    (0,     LLOG_ORIGIN_HANDLE_DESTROY,     tgt_llog_destroy),
999 TGT_LLOG_HDL_VAR(0,     LLOG_ORIGIN_HANDLE_CLOSE,       tgt_llog_close),
1000 };
1001 EXPORT_SYMBOL(tgt_llog_handlers);
1002
1003 /*
1004  * sec context handlers
1005  */
1006 /* XXX: Implement based on mdt_sec_ctx_handle()? */
1007 int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
1008 {
1009         return 0;
1010 }
1011
1012 struct tgt_handler tgt_sec_ctx_handlers[] = {
1013 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT,           tgt_sec_ctx_handle),
1014 TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT_CONT,      tgt_sec_ctx_handle),
1015 TGT_SEC_HDL_VAR(0,      SEC_CTX_FINI,           tgt_sec_ctx_handle),
1016 };
1017 EXPORT_SYMBOL(tgt_sec_ctx_handlers);