4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2011, 2012, Intel Corporation.
27 * lustre/target/tgt_handler.c
29 * Lustre Unified Target request handler code
31 * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32 * Author: Mikhail Pershin <mike.pershin@intel.com>
35 #define DEBUG_SUBSYSTEM S_CLASS
38 #include <obd_class.h>
40 #include "tgt_internal.h"
42 char *tgt_name(struct lu_target *tgt)
44 LASSERT(tgt->lut_obd != NULL);
45 return tgt->lut_obd->obd_name;
47 EXPORT_SYMBOL(tgt_name);
49 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
51 struct req_capsule *pill = tsi->tsi_pill;
52 const struct mdt_body *body = NULL;
57 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
58 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
63 if (flags & HABEO_REFERO) {
65 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
66 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
67 body ? body->eadatasize : 0);
68 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
69 req_capsule_set_size(pill, &RMF_LOGCOOKIES,
72 rc = req_capsule_server_pack(pill);
78 * Invoke handler for this request opc. Also do necessary preprocessing
79 * (according to handler ->th_flags), and post-processing (setting of
80 * ->last_{xid,committed}).
82 static int tgt_handle_request0(struct tgt_session_info *tsi,
83 struct tgt_handler *h,
84 struct ptlrpc_request *req)
92 LASSERT(h->th_act != NULL);
93 LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
94 LASSERT(current->journal_info == NULL);
98 LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
100 if (h->th_fmt != NULL) {
101 req_capsule_set(tsi->tsi_pill, h->th_fmt);
102 rc = tgt_unpack_req_pack_rep(tsi, flags);
105 if (rc == 0 && flags & MUTABOR &&
106 tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
109 if (rc == 0 && flags & HABEO_CLAVIS) {
110 struct ldlm_request *dlm_req;
112 LASSERT(h->th_fmt != NULL);
114 dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
115 if (dlm_req != NULL) {
116 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
118 dlm_req->lock_desc.l_policy_data.\
119 l_inodebits.bits == 0)) {
121 * Lock without inodebits makes no sense and
122 * will oops later in ldlm. If client miss to
123 * set such bits, do not trigger ASSERTION.
125 * For liblustre flock case, it maybe zero.
129 tsi->tsi_dlm_req = dlm_req;
136 if (likely(rc == 0)) {
138 * Process request, there can be two types of rc:
139 * 1) errors with msg unpack/pack, other failures outside the
140 * operation itself. This is counted as serious errors;
141 * 2) errors during fs operation, should be placed in rq_status
145 if (!is_serious(rc) &&
146 !req->rq_no_reply && req->rq_reply_state == NULL) {
147 DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
148 "pack reply and returned 0 error\n",
149 tgt_name(tsi->tsi_tgt), h->th_name);
152 serious = is_serious(rc);
153 rc = clear_serious(rc);
161 * ELDLM_* codes which > 0 should be in rq_status only as well as
162 * all non-serious errors.
164 if (rc > 0 || !serious)
167 LASSERT(current->journal_info == NULL);
170 * If we're DISCONNECTing, the export_data is already freed
172 * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
174 if (likely(rc == 0 && req->rq_export))
175 target_committed_to_req(req);
177 target_send_reply(req, rc, tsi->tsi_reply_fail_id);
181 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
182 struct obd_device *obd, int *process)
184 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
190 case MDS_DONE_WRITING:
191 case MDS_SYNC: /* used in unmounting */
197 *process = target_queue_recovery_request(req, obd);
201 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
208 * Handle recovery. Return:
209 * +1: continue request processing;
210 * -ve: abort immediately with the given error code;
211 * 0: send reply with error code in req->rq_status;
213 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
217 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
219 case SEC_CTX_INIT_CONT:
224 if (unlikely(!class_connected_export(req->rq_export))) {
225 CERROR("%s: operation %d on unconnected export from %s\n",
226 req->rq_export != NULL ?
227 req->rq_export->exp_obd->obd_name : "?",
228 lustre_msg_get_opc(req->rq_reqmsg),
229 libcfs_id2str(req->rq_peer));
230 req->rq_status = -ENOTCONN;
231 target_send_reply(req, -ENOTCONN, reply_fail_id);
235 if (!req->rq_export->exp_obd->obd_replayable)
238 /* sanity check: if the xid matches, the request must be marked as a
239 * resent or replayed */
240 if (req_xid_is_last(req)) {
241 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
242 (MSG_RESENT | MSG_REPLAY))) {
243 DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
244 "last_xid, expected REPLAY or RESENT flag "
246 lustre_msg_get_flags(req->rq_reqmsg));
247 req->rq_status = -ENOTCONN;
251 /* else: note the opposite is not always true; a RESENT req after a
252 * failover will usually not match the last_xid, since it was likely
253 * never committed. A REPLAYed request will almost never match the
254 * last xid, however it could for a committed, but still retained,
257 /* Check for aborted recovery... */
258 if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
262 DEBUG_REQ(D_INFO, req, "Got new replay");
263 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
265 if (rc != 0 || !should_process)
267 else if (should_process < 0) {
268 req->rq_status = should_process;
269 rc = ptlrpc_error(req);
276 int tgt_request_handle(struct ptlrpc_request *req)
278 struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
279 struct lustre_msg *msg = req->rq_reqmsg;
280 struct tgt_handler *h;
281 struct tgt_opc_slice *s;
282 struct lu_target *tgt;
283 int request_fail_id = 0;
284 __u32 opc = lustre_msg_get_opc(msg);
289 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
290 tsi->tsi_pill = &req->rq_pill;
291 tsi->tsi_env = req->rq_svc_thread->t_env;
292 tsi->tsi_dlm_req = NULL;
294 /* if request has export then get handlers slice from corresponding
295 * target, otherwise that should be connect operation */
296 if (opc == MDS_CONNECT || opc == OST_CONNECT ||
297 opc == MGS_CONNECT) {
298 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
299 rc = target_handle_connect(req);
301 rc = ptlrpc_error(req);
306 /* this should be assertion actually, but keep it reporting error
307 * for unified target development time */
308 if (req->rq_export == NULL) {
309 CERROR("Request with no export from %s, opcode %u\n",
310 libcfs_nid2str(req->rq_peer.nid), opc);
311 req->rq_status = -EFAULT;
312 rc = ptlrpc_error(req);
317 tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
318 tsi->tsi_exp = req->rq_export;
320 request_fail_id = tgt->lut_request_fail_id;
321 tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
323 for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
324 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
327 /* opcode was not found in slice */
328 if (unlikely(s->tos_hs == NULL)) {
329 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
330 req->rq_status = -ENOTSUPP;
331 rc = ptlrpc_error(req);
335 if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
338 LASSERT(current->journal_info == NULL);
340 LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
341 h = s->tos_hs + (opc - s->tos_opc_start);
342 if (unlikely(h->th_opc == 0)) {
343 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
344 req->rq_status = -ENOTSUPP;
345 rc = ptlrpc_error(req);
349 rc = lustre_msg_check_version(msg, h->th_version);
351 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
352 " %08x, expecting %08x\n", tgt_name(tgt),
353 lustre_msg_get_version(msg), h->th_version);
354 req->rq_status = -EINVAL;
355 rc = ptlrpc_error(req);
359 rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
360 if (likely(rc == 1)) {
361 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
363 rc = tgt_handle_request0(tsi, h, req);
369 req_capsule_fini(tsi->tsi_pill);
370 tsi->tsi_pill = NULL;
373 EXPORT_SYMBOL(tgt_request_handle);
375 void tgt_counter_incr(struct obd_export *exp, int opcode)
377 lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
378 if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
379 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
381 EXPORT_SYMBOL(tgt_counter_incr);
384 * Unified target generic handlers.
390 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
392 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
393 OBD_CONNECT_RMT_CLIENT_FORCE |
394 OBD_CONNECT_MDS_CAPA |
395 OBD_CONNECT_OSS_CAPA);
398 static int tgt_init_sec_level(struct ptlrpc_request *req)
400 struct lu_target *tgt = class_exp2tgt(req->rq_export);
401 char *client = libcfs_nid2str(req->rq_peer.nid);
402 struct obd_connect_data *data, *reply;
408 data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
409 reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
410 if (data == NULL || reply == NULL)
413 /* connection from MDT is always trusted */
414 if (req->rq_auth_usr_mdt) {
415 tgt_init_sec_none(reply);
419 /* no GSS support case */
420 if (!req->rq_auth_gss) {
421 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
422 CWARN("client %s -> target %s does not use GSS, "
423 "can not run under security level %d.\n",
424 client, tgt_name(tgt), tgt->lut_sec_level);
427 tgt_init_sec_none(reply);
432 /* old version case */
433 if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
434 !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
435 !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
436 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
437 CWARN("client %s -> target %s uses old version, "
438 "can not run under security level %d.\n",
439 client, tgt_name(tgt), tgt->lut_sec_level);
442 CWARN("client %s -> target %s uses old version, "
443 "run under security level %d.\n",
444 client, tgt_name(tgt), tgt->lut_sec_level);
445 tgt_init_sec_none(reply);
450 remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
452 if (!req->rq_auth_remote)
453 CDEBUG(D_SEC, "client (local realm) %s -> target %s "
454 "asked to be remote.\n", client, tgt_name(tgt));
455 } else if (req->rq_auth_remote) {
457 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
458 "as remote by default.\n", client, tgt_name(tgt));
462 if (!tgt->lut_oss_capa) {
464 "client %s -> target %s is set as remote,"
465 " but OSS capabilities are not enabled: %d.\n",
466 client, tgt_name(tgt), tgt->lut_oss_capa);
470 if (req->rq_auth_uid == INVALID_UID) {
471 CDEBUG(D_SEC, "client %s -> target %s: user is not "
472 "authenticated!\n", client, tgt_name(tgt));
478 switch (tgt->lut_sec_level) {
479 case LUSTRE_SEC_NONE:
482 "client %s -> target %s is set as remote, "
483 "can not run under security level %d.\n",
484 client, tgt_name(tgt), tgt->lut_sec_level);
487 tgt_init_sec_none(reply);
489 case LUSTRE_SEC_REMOTE:
491 tgt_init_sec_none(reply);
496 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
497 OBD_CONNECT_RMT_CLIENT_FORCE);
498 if (!tgt->lut_oss_capa)
499 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
500 if (!tgt->lut_mds_capa)
501 reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
510 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
512 struct lu_target *tgt = class_exp2tgt(exp);
513 struct sptlrpc_flavor flvr;
517 LASSERT(tgt->lut_obd);
518 LASSERT(tgt->lut_slice);
520 /* always allow ECHO client */
521 if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
522 LUSTRE_ECHO_NAME) == 0)) {
523 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
527 if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
528 read_lock(&tgt->lut_sptlrpc_lock);
529 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
533 read_unlock(&tgt->lut_sptlrpc_lock);
535 spin_lock(&exp->exp_lock);
536 exp->exp_sp_peer = req->rq_sp_from;
537 exp->exp_flvr = flvr;
538 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
539 exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
540 CERROR("%s: unauthorized rpc flavor %x from %s, "
541 "expect %x\n", tgt_name(tgt),
543 libcfs_nid2str(req->rq_peer.nid),
544 exp->exp_flvr.sf_rpc);
547 spin_unlock(&exp->exp_lock);
549 if (exp->exp_sp_peer != req->rq_sp_from) {
550 CERROR("%s: RPC source %s doesn't match %s\n",
552 sptlrpc_part2name(req->rq_sp_from),
553 sptlrpc_part2name(exp->exp_sp_peer));
556 rc = sptlrpc_target_export_check(exp, req);
563 int tgt_connect(struct tgt_session_info *tsi)
565 struct ptlrpc_request *req = tgt_ses_req(tsi);
566 struct obd_connect_data *reply;
571 rc = tgt_init_sec_level(req);
575 /* XXX: better to call this check right after getting new export but
576 * before last_rcvd slot allocation to avoid server load upon insecure
577 * connects. This is to be fixed after unifiyng all targets.
579 rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
583 /* To avoid exposing partially initialized connection flags, changes up
584 * to this point have been staged in reply->ocd_connect_flags. Now that
585 * connection handling has completed successfully, atomically update
586 * the connect flags in the shared export data structure. LU-1623 */
587 reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
588 spin_lock(&tsi->tsi_exp->exp_lock);
589 *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
590 spin_unlock(&tsi->tsi_exp->exp_lock);
594 obd_disconnect(class_export_get(tsi->tsi_exp));
597 EXPORT_SYMBOL(tgt_connect);
599 int tgt_disconnect(struct tgt_session_info *tsi)
605 rc = target_handle_disconnect(tgt_ses_req(tsi));
607 RETURN(err_serious(rc));
611 EXPORT_SYMBOL(tgt_disconnect);
614 * Unified target OBD handlers
616 int tgt_obd_ping(struct tgt_session_info *tsi)
622 rc = target_handle_ping(tgt_ses_req(tsi));
624 RETURN(err_serious(rc));
628 EXPORT_SYMBOL(tgt_obd_ping);
630 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
632 return err_serious(-EOPNOTSUPP);
634 EXPORT_SYMBOL(tgt_obd_log_cancel);
636 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
638 return err_serious(-EOPNOTSUPP);
640 EXPORT_SYMBOL(tgt_obd_qc_callback);
643 * Unified target DLM handlers.
645 struct ldlm_callback_suite tgt_dlm_cbs = {
646 .lcs_completion = ldlm_server_completion_ast,
647 .lcs_blocking = ldlm_server_blocking_ast,
650 int tgt_enqueue(struct tgt_session_info *tsi)
652 struct ptlrpc_request *req = tgt_ses_req(tsi);
657 * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
658 * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
660 LASSERT(tsi->tsi_dlm_req != NULL);
662 rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
663 tsi->tsi_dlm_req, &tgt_dlm_cbs);
665 RETURN(err_serious(rc));
667 RETURN(req->rq_status);
669 EXPORT_SYMBOL(tgt_enqueue);
671 int tgt_convert(struct tgt_session_info *tsi)
673 struct ptlrpc_request *req = tgt_ses_req(tsi);
677 LASSERT(tsi->tsi_dlm_req);
678 rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
680 RETURN(err_serious(rc));
682 RETURN(req->rq_status);
684 EXPORT_SYMBOL(tgt_convert);
686 int tgt_bl_callback(struct tgt_session_info *tsi)
688 return err_serious(-EOPNOTSUPP);
690 EXPORT_SYMBOL(tgt_bl_callback);
692 int tgt_cp_callback(struct tgt_session_info *tsi)
694 return err_serious(-EOPNOTSUPP);
696 EXPORT_SYMBOL(tgt_cp_callback);
699 * Unified target LLOG handlers.
701 int tgt_llog_open(struct tgt_session_info *tsi)
707 rc = llog_origin_handle_open(tgt_ses_req(tsi));
711 EXPORT_SYMBOL(tgt_llog_open);
713 int tgt_llog_close(struct tgt_session_info *tsi)
719 rc = llog_origin_handle_close(tgt_ses_req(tsi));
723 EXPORT_SYMBOL(tgt_llog_close);
726 int tgt_llog_destroy(struct tgt_session_info *tsi)
732 rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
736 EXPORT_SYMBOL(tgt_llog_destroy);
738 int tgt_llog_read_header(struct tgt_session_info *tsi)
744 rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
748 EXPORT_SYMBOL(tgt_llog_read_header);
750 int tgt_llog_next_block(struct tgt_session_info *tsi)
756 rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
760 EXPORT_SYMBOL(tgt_llog_next_block);
762 int tgt_llog_prev_block(struct tgt_session_info *tsi)
768 rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
772 EXPORT_SYMBOL(tgt_llog_prev_block);