4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2011, 2012, Intel Corporation.
27 * lustre/target/tgt_handler.c
29 * Lustre Unified Target request handler code
31 * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32 * Author: Mikhail Pershin <mike.pershin@intel.com>
35 #define DEBUG_SUBSYSTEM S_CLASS
38 #include <obd_class.h>
40 #include "tgt_internal.h"
42 char *tgt_name(struct lu_target *tgt)
44 LASSERT(tgt->lut_obd != NULL);
45 return tgt->lut_obd->obd_name;
47 EXPORT_SYMBOL(tgt_name);
50 * Generic code handling requests that have struct mdt_body passed in:
52 * - extract mdt_body from request and save it in @tsi, if present;
54 * - create lu_object, corresponding to the fid in mdt_body, and save it in
57 * - if HABEO_CORPUS flag is set for this request type check whether object
58 * actually exists on storage (lu_object_exists()).
61 static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
63 const struct mdt_body *body;
64 struct lu_object *obj;
65 struct req_capsule *pill = tsi->tsi_pill;
70 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
74 tsi->tsi_mdt_body = body;
76 if (!(body->valid & OBD_MD_FLID))
79 /* mdc_pack_body() doesn't check if fid is zero and set OBD_ML_FID
80 * in any case in pre-2.5 clients. Fix that here if needed */
81 if (unlikely(fid_is_zero(&body->fid1)))
84 if (!fid_is_sane(&body->fid1)) {
85 CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
90 obj = lu_object_find(tsi->tsi_env,
91 &tsi->tsi_tgt->lut_bottom->dd_lu_dev,
94 if ((flags & HABEO_CORPUS) && !lu_object_exists(obj)) {
95 lu_object_put(tsi->tsi_env, obj);
96 /* for capability renew ENOENT will be handled in
98 if (body->valid & OBD_MD_FLOSSCAPA)
103 tsi->tsi_corpus = obj;
112 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
114 struct req_capsule *pill = tsi->tsi_pill;
119 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
120 rc = tgt_mdt_body_unpack(tsi, flags);
125 if (flags & HABEO_REFERO) {
127 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
128 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
129 tsi->tsi_mdt_body->eadatasize);
130 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
131 req_capsule_set_size(pill, &RMF_LOGCOOKIES,
134 rc = req_capsule_server_pack(pill);
140 * Invoke handler for this request opc. Also do necessary preprocessing
141 * (according to handler ->th_flags), and post-processing (setting of
142 * ->last_{xid,committed}).
144 static int tgt_handle_request0(struct tgt_session_info *tsi,
145 struct tgt_handler *h,
146 struct ptlrpc_request *req)
154 LASSERT(h->th_act != NULL);
155 LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
156 LASSERT(current->journal_info == NULL);
159 * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
160 * to put same checks into handlers like mdt_close(), mdt_reint(),
161 * etc., without talking to mdt authors first. Checking same thing
162 * there again is useless and returning 0 error without packing reply
163 * is buggy! Handlers either pack reply or return error.
165 * We return 0 here and do not send any reply in order to emulate
166 * network failure. Do not send any reply in case any of NET related
167 * fail_id has occured.
169 if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
174 LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
176 if (h->th_fmt != NULL) {
177 req_capsule_set(tsi->tsi_pill, h->th_fmt);
178 rc = tgt_unpack_req_pack_rep(tsi, flags);
181 if (rc == 0 && flags & MUTABOR &&
182 tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
185 if (rc == 0 && flags & HABEO_CLAVIS) {
186 struct ldlm_request *dlm_req;
188 LASSERT(h->th_fmt != NULL);
190 dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
191 if (dlm_req != NULL) {
192 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
194 dlm_req->lock_desc.l_policy_data.\
195 l_inodebits.bits == 0)) {
197 * Lock without inodebits makes no sense and
198 * will oops later in ldlm. If client miss to
199 * set such bits, do not trigger ASSERTION.
201 * For liblustre flock case, it maybe zero.
205 tsi->tsi_dlm_req = dlm_req;
212 if (likely(rc == 0)) {
214 * Process request, there can be two types of rc:
215 * 1) errors with msg unpack/pack, other failures outside the
216 * operation itself. This is counted as serious errors;
217 * 2) errors during fs operation, should be placed in rq_status
221 if (!is_serious(rc) &&
222 !req->rq_no_reply && req->rq_reply_state == NULL) {
223 DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
224 "pack reply and returned 0 error\n",
225 tgt_name(tsi->tsi_tgt), h->th_name);
228 serious = is_serious(rc);
229 rc = clear_serious(rc);
237 * ELDLM_* codes which > 0 should be in rq_status only as well as
238 * all non-serious errors.
240 if (rc > 0 || !serious)
243 LASSERT(current->journal_info == NULL);
246 * If we're DISCONNECTing, the export_data is already freed
248 * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
250 if (likely(rc == 0 && req->rq_export))
251 target_committed_to_req(req);
253 target_send_reply(req, rc, tsi->tsi_reply_fail_id);
257 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
258 struct obd_device *obd, int *process)
260 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
267 case MDS_DONE_WRITING:
268 case MDS_SYNC: /* used in unmounting */
275 *process = target_queue_recovery_request(req, obd);
279 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
286 * Handle recovery. Return:
287 * +1: continue request processing;
288 * -ve: abort immediately with the given error code;
289 * 0: send reply with error code in req->rq_status;
291 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
295 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
300 case SEC_CTX_INIT_CONT:
305 if (!req->rq_export->exp_obd->obd_replayable)
308 /* sanity check: if the xid matches, the request must be marked as a
309 * resent or replayed */
310 if (req_xid_is_last(req)) {
311 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
312 (MSG_RESENT | MSG_REPLAY))) {
313 DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
314 "last_xid, expected REPLAY or RESENT flag "
316 lustre_msg_get_flags(req->rq_reqmsg));
317 req->rq_status = -ENOTCONN;
321 /* else: note the opposite is not always true; a RESENT req after a
322 * failover will usually not match the last_xid, since it was likely
323 * never committed. A REPLAYed request will almost never match the
324 * last xid, however it could for a committed, but still retained,
327 /* Check for aborted recovery... */
328 if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
332 DEBUG_REQ(D_INFO, req, "Got new replay");
333 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
335 if (rc != 0 || !should_process)
337 else if (should_process < 0) {
338 req->rq_status = should_process;
339 rc = ptlrpc_error(req);
346 int tgt_request_handle(struct ptlrpc_request *req)
348 struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
350 struct lustre_msg *msg = req->rq_reqmsg;
351 struct tgt_handler *h;
352 struct tgt_opc_slice *s;
353 struct lu_target *tgt;
354 int request_fail_id = 0;
355 __u32 opc = lustre_msg_get_opc(msg);
360 /* Refill the context, to make sure all thread keys are allocated */
361 lu_env_refill(req->rq_svc_thread->t_env);
363 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
364 tsi->tsi_pill = &req->rq_pill;
365 tsi->tsi_env = req->rq_svc_thread->t_env;
367 /* if request has export then get handlers slice from corresponding
368 * target, otherwise that should be connect operation */
369 if (opc == MDS_CONNECT || opc == OST_CONNECT ||
370 opc == MGS_CONNECT) {
371 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
372 rc = target_handle_connect(req);
374 rc = ptlrpc_error(req);
379 if (unlikely(!class_connected_export(req->rq_export))) {
380 CDEBUG(D_HA, "operation %d on unconnected OST from %s\n",
381 opc, libcfs_id2str(req->rq_peer));
382 req->rq_status = -ENOTCONN;
383 rc = ptlrpc_error(req);
387 tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
388 tsi->tsi_exp = req->rq_export;
390 request_fail_id = tgt->lut_request_fail_id;
391 tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
393 for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
394 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
397 /* opcode was not found in slice */
398 if (unlikely(s->tos_hs == NULL)) {
399 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
400 req->rq_status = -ENOTSUPP;
401 rc = ptlrpc_error(req);
405 if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
408 LASSERT(current->journal_info == NULL);
410 LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
411 h = s->tos_hs + (opc - s->tos_opc_start);
412 if (unlikely(h->th_opc == 0)) {
413 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
414 req->rq_status = -ENOTSUPP;
415 rc = ptlrpc_error(req);
419 rc = lustre_msg_check_version(msg, h->th_version);
421 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
422 " %08x, expecting %08x\n", tgt_name(tgt),
423 lustre_msg_get_version(msg), h->th_version);
424 req->rq_status = -EINVAL;
425 rc = ptlrpc_error(req);
429 rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
430 if (likely(rc == 1)) {
431 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
433 rc = tgt_handle_request0(tsi, h, req);
439 req_capsule_fini(tsi->tsi_pill);
440 tsi->tsi_pill = NULL;
441 if (tsi->tsi_corpus != NULL) {
442 lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
443 tsi->tsi_corpus = NULL;
446 tsi->tsi_mdt_body = NULL;
447 tsi->tsi_dlm_req = NULL;
450 EXPORT_SYMBOL(tgt_request_handle);
452 void tgt_counter_incr(struct obd_export *exp, int opcode)
454 lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
455 if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
456 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
458 EXPORT_SYMBOL(tgt_counter_incr);
461 * Unified target generic handlers.
467 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
469 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
470 OBD_CONNECT_RMT_CLIENT_FORCE |
471 OBD_CONNECT_MDS_CAPA |
472 OBD_CONNECT_OSS_CAPA);
475 static int tgt_init_sec_level(struct ptlrpc_request *req)
477 struct lu_target *tgt = class_exp2tgt(req->rq_export);
478 char *client = libcfs_nid2str(req->rq_peer.nid);
479 struct obd_connect_data *data, *reply;
485 data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
486 reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
487 if (data == NULL || reply == NULL)
490 /* connection from MDT is always trusted */
491 if (req->rq_auth_usr_mdt) {
492 tgt_init_sec_none(reply);
496 /* no GSS support case */
497 if (!req->rq_auth_gss) {
498 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
499 CWARN("client %s -> target %s does not use GSS, "
500 "can not run under security level %d.\n",
501 client, tgt_name(tgt), tgt->lut_sec_level);
504 tgt_init_sec_none(reply);
509 /* old version case */
510 if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
511 !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
512 !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
513 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
514 CWARN("client %s -> target %s uses old version, "
515 "can not run under security level %d.\n",
516 client, tgt_name(tgt), tgt->lut_sec_level);
519 CWARN("client %s -> target %s uses old version, "
520 "run under security level %d.\n",
521 client, tgt_name(tgt), tgt->lut_sec_level);
522 tgt_init_sec_none(reply);
527 remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
529 if (!req->rq_auth_remote)
530 CDEBUG(D_SEC, "client (local realm) %s -> target %s "
531 "asked to be remote.\n", client, tgt_name(tgt));
532 } else if (req->rq_auth_remote) {
534 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
535 "as remote by default.\n", client, tgt_name(tgt));
539 if (!tgt->lut_oss_capa) {
541 "client %s -> target %s is set as remote,"
542 " but OSS capabilities are not enabled: %d.\n",
543 client, tgt_name(tgt), tgt->lut_oss_capa);
547 if (req->rq_auth_uid == INVALID_UID) {
548 CDEBUG(D_SEC, "client %s -> target %s: user is not "
549 "authenticated!\n", client, tgt_name(tgt));
555 switch (tgt->lut_sec_level) {
556 case LUSTRE_SEC_NONE:
559 "client %s -> target %s is set as remote, "
560 "can not run under security level %d.\n",
561 client, tgt_name(tgt), tgt->lut_sec_level);
564 tgt_init_sec_none(reply);
566 case LUSTRE_SEC_REMOTE:
568 tgt_init_sec_none(reply);
573 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
574 OBD_CONNECT_RMT_CLIENT_FORCE);
575 if (!tgt->lut_oss_capa)
576 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
577 if (!tgt->lut_mds_capa)
578 reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
587 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
589 struct lu_target *tgt = class_exp2tgt(exp);
590 struct sptlrpc_flavor flvr;
594 LASSERT(tgt->lut_obd);
595 LASSERT(tgt->lut_slice);
597 /* always allow ECHO client */
598 if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
599 LUSTRE_ECHO_NAME) == 0)) {
600 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
604 if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
605 read_lock(&tgt->lut_sptlrpc_lock);
606 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
610 read_unlock(&tgt->lut_sptlrpc_lock);
612 spin_lock(&exp->exp_lock);
613 exp->exp_sp_peer = req->rq_sp_from;
614 exp->exp_flvr = flvr;
615 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
616 exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
617 CERROR("%s: unauthorized rpc flavor %x from %s, "
618 "expect %x\n", tgt_name(tgt),
620 libcfs_nid2str(req->rq_peer.nid),
621 exp->exp_flvr.sf_rpc);
624 spin_unlock(&exp->exp_lock);
626 if (exp->exp_sp_peer != req->rq_sp_from) {
627 CERROR("%s: RPC source %s doesn't match %s\n",
629 sptlrpc_part2name(req->rq_sp_from),
630 sptlrpc_part2name(exp->exp_sp_peer));
633 rc = sptlrpc_target_export_check(exp, req);
640 int tgt_connect(struct tgt_session_info *tsi)
642 struct ptlrpc_request *req = tgt_ses_req(tsi);
643 struct obd_connect_data *reply;
648 rc = tgt_init_sec_level(req);
652 /* XXX: better to call this check right after getting new export but
653 * before last_rcvd slot allocation to avoid server load upon insecure
654 * connects. This is to be fixed after unifiyng all targets.
656 rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
660 /* To avoid exposing partially initialized connection flags, changes up
661 * to this point have been staged in reply->ocd_connect_flags. Now that
662 * connection handling has completed successfully, atomically update
663 * the connect flags in the shared export data structure. LU-1623 */
664 reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
665 spin_lock(&tsi->tsi_exp->exp_lock);
666 *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
667 tsi->tsi_exp->exp_connect_data.ocd_brw_size = reply->ocd_brw_size;
668 spin_unlock(&tsi->tsi_exp->exp_lock);
672 obd_disconnect(class_export_get(tsi->tsi_exp));
675 EXPORT_SYMBOL(tgt_connect);
677 int tgt_disconnect(struct tgt_session_info *tsi)
683 rc = target_handle_disconnect(tgt_ses_req(tsi));
685 RETURN(err_serious(rc));
689 EXPORT_SYMBOL(tgt_disconnect);
692 * Unified target OBD handlers
694 int tgt_obd_ping(struct tgt_session_info *tsi)
700 rc = target_handle_ping(tgt_ses_req(tsi));
702 RETURN(err_serious(rc));
706 EXPORT_SYMBOL(tgt_obd_ping);
708 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
710 return err_serious(-EOPNOTSUPP);
712 EXPORT_SYMBOL(tgt_obd_log_cancel);
714 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
716 return err_serious(-EOPNOTSUPP);
718 EXPORT_SYMBOL(tgt_obd_qc_callback);
720 int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
722 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
723 struct ptlrpc_request *req = tgt_ses_req(tsi);
724 struct obd_export *exp = req->rq_export;
725 struct ptlrpc_bulk_desc *desc;
726 struct l_wait_info *lwi = &tti->tti_u.rdpg.tti_wait_info;
734 desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
739 if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
740 /* old client requires reply size in it's PAGE_CACHE_SIZE,
741 * which is rdpg->rp_count */
742 nob = rdpg->rp_count;
744 for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
745 i++, tmpcount -= tmpsize) {
746 tmpsize = min_t(int, tmpcount, PAGE_CACHE_SIZE);
747 ptlrpc_prep_bulk_page_pin(desc, rdpg->rp_pages[i], 0, tmpsize);
750 LASSERT(desc->bd_nob == nob);
751 rc = target_bulk_io(exp, desc, lwi);
752 ptlrpc_free_bulk_pin(desc);
755 EXPORT_SYMBOL(tgt_sendpage);
758 * OBD_IDX_READ handler
760 int tgt_obd_idx_read(struct tgt_session_info *tsi)
762 struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
763 struct lu_rdpg *rdpg = &tti->tti_u.rdpg.tti_rdpg;
764 struct idx_info *req_ii, *rep_ii;
769 memset(rdpg, 0, sizeof(*rdpg));
770 req_capsule_set(tsi->tsi_pill, &RQF_OBD_IDX_READ);
772 /* extract idx_info buffer from request & reply */
773 req_ii = req_capsule_client_get(tsi->tsi_pill, &RMF_IDX_INFO);
774 if (req_ii == NULL || req_ii->ii_magic != IDX_INFO_MAGIC)
775 RETURN(err_serious(-EPROTO));
777 rc = req_capsule_server_pack(tsi->tsi_pill);
779 RETURN(err_serious(rc));
781 rep_ii = req_capsule_server_get(tsi->tsi_pill, &RMF_IDX_INFO);
783 RETURN(err_serious(-EFAULT));
784 rep_ii->ii_magic = IDX_INFO_MAGIC;
786 /* extract hash to start with */
787 rdpg->rp_hash = req_ii->ii_hash_start;
789 /* extract requested attributes */
790 rdpg->rp_attrs = req_ii->ii_attrs;
792 /* check that fid packed in request is valid and supported */
793 if (!fid_is_sane(&req_ii->ii_fid))
795 rep_ii->ii_fid = req_ii->ii_fid;
798 rep_ii->ii_flags = req_ii->ii_flags;
800 /* compute number of pages to allocate, ii_count is the number of 4KB
802 if (req_ii->ii_count <= 0)
803 GOTO(out, rc = -EFAULT);
804 rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
805 exp_max_brw_size(tsi->tsi_exp));
806 rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE -1) >> PAGE_CACHE_SHIFT;
808 /* allocate pages to store the containers */
809 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
810 if (rdpg->rp_pages == NULL)
811 GOTO(out, rc = -ENOMEM);
812 for (i = 0; i < rdpg->rp_npages; i++) {
813 rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
814 if (rdpg->rp_pages[i] == NULL)
815 GOTO(out, rc = -ENOMEM);
818 /* populate pages with key/record pairs */
819 rc = dt_index_read(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, rep_ii, rdpg);
823 LASSERTF(rc <= rdpg->rp_count, "dt_index_read() returned more than "
824 "asked %d > %d\n", rc, rdpg->rp_count);
826 /* send pages to client */
827 rc = tgt_sendpage(tsi, rdpg, rc);
832 if (rdpg->rp_pages) {
833 for (i = 0; i < rdpg->rp_npages; i++)
834 if (rdpg->rp_pages[i])
835 __free_page(rdpg->rp_pages[i]);
836 OBD_FREE(rdpg->rp_pages,
837 rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
841 EXPORT_SYMBOL(tgt_obd_idx_read);
843 struct tgt_handler tgt_obd_handlers[] = {
844 TGT_OBD_HDL (0, OBD_PING, tgt_obd_ping),
845 TGT_OBD_HDL_VAR(0, OBD_LOG_CANCEL, tgt_obd_log_cancel),
846 TGT_OBD_HDL_VAR(0, OBD_QC_CALLBACK, tgt_obd_qc_callback),
847 TGT_OBD_HDL (0, OBD_IDX_READ, tgt_obd_idx_read)
849 EXPORT_SYMBOL(tgt_obd_handlers);
852 * Unified target DLM handlers.
854 struct ldlm_callback_suite tgt_dlm_cbs = {
855 .lcs_completion = ldlm_server_completion_ast,
856 .lcs_blocking = ldlm_server_blocking_ast,
857 .lcs_glimpse = ldlm_server_glimpse_ast
860 int tgt_enqueue(struct tgt_session_info *tsi)
862 struct ptlrpc_request *req = tgt_ses_req(tsi);
867 * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
868 * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
870 LASSERT(tsi->tsi_dlm_req != NULL);
871 rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
872 tsi->tsi_dlm_req, &tgt_dlm_cbs);
874 RETURN(err_serious(rc));
876 RETURN(req->rq_status);
878 EXPORT_SYMBOL(tgt_enqueue);
880 int tgt_convert(struct tgt_session_info *tsi)
882 struct ptlrpc_request *req = tgt_ses_req(tsi);
886 LASSERT(tsi->tsi_dlm_req);
887 rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
889 RETURN(err_serious(rc));
891 RETURN(req->rq_status);
893 EXPORT_SYMBOL(tgt_convert);
895 int tgt_bl_callback(struct tgt_session_info *tsi)
897 return err_serious(-EOPNOTSUPP);
899 EXPORT_SYMBOL(tgt_bl_callback);
901 int tgt_cp_callback(struct tgt_session_info *tsi)
903 return err_serious(-EOPNOTSUPP);
905 EXPORT_SYMBOL(tgt_cp_callback);
907 /* generic LDLM target handler */
908 struct tgt_handler tgt_dlm_handlers[] = {
909 TGT_DLM_HDL (HABEO_CLAVIS, LDLM_ENQUEUE, tgt_enqueue),
910 TGT_DLM_HDL_VAR(HABEO_CLAVIS, LDLM_CONVERT, tgt_convert),
911 TGT_DLM_HDL_VAR(0, LDLM_BL_CALLBACK, tgt_bl_callback),
912 TGT_DLM_HDL_VAR(0, LDLM_CP_CALLBACK, tgt_cp_callback)
914 EXPORT_SYMBOL(tgt_dlm_handlers);
917 * Unified target LLOG handlers.
919 int tgt_llog_open(struct tgt_session_info *tsi)
925 rc = llog_origin_handle_open(tgt_ses_req(tsi));
929 EXPORT_SYMBOL(tgt_llog_open);
931 int tgt_llog_close(struct tgt_session_info *tsi)
937 rc = llog_origin_handle_close(tgt_ses_req(tsi));
941 EXPORT_SYMBOL(tgt_llog_close);
944 int tgt_llog_destroy(struct tgt_session_info *tsi)
950 rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
954 EXPORT_SYMBOL(tgt_llog_destroy);
956 int tgt_llog_read_header(struct tgt_session_info *tsi)
962 rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
966 EXPORT_SYMBOL(tgt_llog_read_header);
968 int tgt_llog_next_block(struct tgt_session_info *tsi)
974 rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
978 EXPORT_SYMBOL(tgt_llog_next_block);
980 int tgt_llog_prev_block(struct tgt_session_info *tsi)
986 rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
990 EXPORT_SYMBOL(tgt_llog_prev_block);
992 /* generic llog target handler */
993 struct tgt_handler tgt_llog_handlers[] = {
994 TGT_LLOG_HDL (0, LLOG_ORIGIN_HANDLE_CREATE, tgt_llog_open),
995 TGT_LLOG_HDL (0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, tgt_llog_next_block),
996 TGT_LLOG_HDL (0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
997 TGT_LLOG_HDL (0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, tgt_llog_prev_block),
998 TGT_LLOG_HDL (0, LLOG_ORIGIN_HANDLE_DESTROY, tgt_llog_destroy),
999 TGT_LLOG_HDL_VAR(0, LLOG_ORIGIN_HANDLE_CLOSE, tgt_llog_close),
1001 EXPORT_SYMBOL(tgt_llog_handlers);
1004 * sec context handlers
1006 /* XXX: Implement based on mdt_sec_ctx_handle()? */
1007 int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
1012 struct tgt_handler tgt_sec_ctx_handlers[] = {
1013 TGT_SEC_HDL_VAR(0, SEC_CTX_INIT, tgt_sec_ctx_handle),
1014 TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT, tgt_sec_ctx_handle),
1015 TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, tgt_sec_ctx_handle),
1017 EXPORT_SYMBOL(tgt_sec_ctx_handlers);