Whamcloud - gitweb
8151fda503bf6ce4ac516649aa4f7c6612041951
[fs/lustre-release.git] / lustre / target / tgt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2011, 2012, Intel Corporation.
25  */
26 /*
27  * lustre/target/tgt_handler.c
28  *
29  * Lustre Unified Target request handler code
30  *
31  * Author: Brian Behlendorf <behlendorf1@llnl.gov>
32  * Author: Mikhail Pershin <mike.pershin@intel.com>
33  */
34
35 #define DEBUG_SUBSYSTEM S_CLASS
36
37 #include <obd.h>
38 #include <obd_class.h>
39
40 #include "tgt_internal.h"
41
42 char *tgt_name(struct lu_target *tgt)
43 {
44         LASSERT(tgt->lut_obd != NULL);
45         return tgt->lut_obd->obd_name;
46 }
47 EXPORT_SYMBOL(tgt_name);
48
49 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
50 {
51         struct req_capsule      *pill = tsi->tsi_pill;
52         const struct mdt_body   *body = NULL;
53         int                      rc = 0;
54
55         ENTRY;
56
57         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
58                 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
59                 if (body == NULL)
60                         RETURN(-EFAULT);
61         }
62
63         if (flags & HABEO_REFERO) {
64                 /* Pack reply */
65                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
66                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
67                                              body ? body->eadatasize : 0);
68                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
69                         req_capsule_set_size(pill, &RMF_LOGCOOKIES,
70                                              RCL_SERVER, 0);
71
72                 rc = req_capsule_server_pack(pill);
73         }
74         RETURN(rc);
75 }
76
77 /*
78  * Invoke handler for this request opc. Also do necessary preprocessing
79  * (according to handler ->th_flags), and post-processing (setting of
80  * ->last_{xid,committed}).
81  */
82 static int tgt_handle_request0(struct tgt_session_info *tsi,
83                                struct tgt_handler *h,
84                                struct ptlrpc_request *req)
85 {
86         int      serious = 0;
87         int      rc;
88         __u32    flags;
89
90         ENTRY;
91
92         LASSERT(h->th_act != NULL);
93         LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
94         LASSERT(current->journal_info == NULL);
95
96         rc = 0;
97         flags = h->th_flags;
98         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
99                      h->th_fmt != NULL));
100         if (h->th_fmt != NULL) {
101                 req_capsule_set(tsi->tsi_pill, h->th_fmt);
102                 rc = tgt_unpack_req_pack_rep(tsi, flags);
103         }
104
105         if (rc == 0 && flags & MUTABOR &&
106             tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
107                 rc = -EROFS;
108
109         if (rc == 0 && flags & HABEO_CLAVIS) {
110                 struct ldlm_request *dlm_req;
111
112                 LASSERT(h->th_fmt != NULL);
113
114                 dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
115                 if (dlm_req != NULL) {
116                         if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
117                                      LDLM_IBITS &&
118                                      dlm_req->lock_desc.l_policy_data.\
119                                      l_inodebits.bits == 0)) {
120                                 /*
121                                  * Lock without inodebits makes no sense and
122                                  * will oops later in ldlm. If client miss to
123                                  * set such bits, do not trigger ASSERTION.
124                                  *
125                                  * For liblustre flock case, it maybe zero.
126                                  */
127                                 rc = -EPROTO;
128                         } else {
129                                 tsi->tsi_dlm_req = dlm_req;
130                         }
131                 } else {
132                         rc = -EFAULT;
133                 }
134         }
135
136         if (likely(rc == 0)) {
137                 /*
138                  * Process request, there can be two types of rc:
139                  * 1) errors with msg unpack/pack, other failures outside the
140                  * operation itself. This is counted as serious errors;
141                  * 2) errors during fs operation, should be placed in rq_status
142                  * only
143                  */
144                 rc = h->th_act(tsi);
145                 if (!is_serious(rc) &&
146                     !req->rq_no_reply && req->rq_reply_state == NULL) {
147                         DEBUG_REQ(D_ERROR, req, "%s \"handler\" %s did not "
148                                   "pack reply and returned 0 error\n",
149                                   tgt_name(tsi->tsi_tgt), h->th_name);
150                         LBUG();
151                 }
152                 serious = is_serious(rc);
153                 rc = clear_serious(rc);
154         } else {
155                 serious = 1;
156         }
157
158         req->rq_status = rc;
159
160         /*
161          * ELDLM_* codes which > 0 should be in rq_status only as well as
162          * all non-serious errors.
163          */
164         if (rc > 0 || !serious)
165                 rc = 0;
166
167         LASSERT(current->journal_info == NULL);
168
169         /*
170          * If we're DISCONNECTing, the export_data is already freed
171          *
172          * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
173          */
174         if (likely(rc == 0 && req->rq_export))
175                 target_committed_to_req(req);
176
177         target_send_reply(req, rc, tsi->tsi_reply_fail_id);
178         RETURN(0);
179 }
180
181 static int tgt_filter_recovery_request(struct ptlrpc_request *req,
182                                        struct obd_device *obd, int *process)
183 {
184         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
185         case MDS_DISCONNECT:
186         case OST_DISCONNECT:
187                 *process = 1;
188                 RETURN(0);
189         case MDS_CLOSE:
190         case MDS_DONE_WRITING:
191         case MDS_SYNC: /* used in unmounting */
192         case OBD_PING:
193         case MDS_REINT:
194         case SEQ_QUERY:
195         case FLD_QUERY:
196         case LDLM_ENQUEUE:
197                 *process = target_queue_recovery_request(req, obd);
198                 RETURN(0);
199
200         default:
201                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
202                 *process = -EAGAIN;
203                 RETURN(0);
204         }
205 }
206
207 /*
208  * Handle recovery. Return:
209  *        +1: continue request processing;
210  *       -ve: abort immediately with the given error code;
211  *         0: send reply with error code in req->rq_status;
212  */
213 int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
214 {
215         ENTRY;
216
217         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
218         case SEC_CTX_INIT:
219         case SEC_CTX_INIT_CONT:
220         case SEC_CTX_FINI:
221                 RETURN(+1);
222         }
223
224         if (unlikely(!class_connected_export(req->rq_export))) {
225                 CERROR("%s: operation %d on unconnected export from %s\n",
226                        req->rq_export != NULL ?
227                        req->rq_export->exp_obd->obd_name : "?",
228                        lustre_msg_get_opc(req->rq_reqmsg),
229                        libcfs_id2str(req->rq_peer));
230                 req->rq_status = -ENOTCONN;
231                 target_send_reply(req, -ENOTCONN, reply_fail_id);
232                 RETURN(0);
233         }
234
235         if (!req->rq_export->exp_obd->obd_replayable)
236                 RETURN(+1);
237
238         /* sanity check: if the xid matches, the request must be marked as a
239          * resent or replayed */
240         if (req_xid_is_last(req)) {
241                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
242                       (MSG_RESENT | MSG_REPLAY))) {
243                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches "
244                                   "last_xid, expected REPLAY or RESENT flag "
245                                   "(%x)", req->rq_xid,
246                                   lustre_msg_get_flags(req->rq_reqmsg));
247                         req->rq_status = -ENOTCONN;
248                         RETURN(-ENOTCONN);
249                 }
250         }
251         /* else: note the opposite is not always true; a RESENT req after a
252          * failover will usually not match the last_xid, since it was likely
253          * never committed. A REPLAYed request will almost never match the
254          * last xid, however it could for a committed, but still retained,
255          * open. */
256
257         /* Check for aborted recovery... */
258         if (unlikely(req->rq_export->exp_obd->obd_recovering)) {
259                 int rc;
260                 int should_process;
261
262                 DEBUG_REQ(D_INFO, req, "Got new replay");
263                 rc = tgt_filter_recovery_request(req, req->rq_export->exp_obd,
264                                                  &should_process);
265                 if (rc != 0 || !should_process)
266                         RETURN(rc);
267                 else if (should_process < 0) {
268                         req->rq_status = should_process;
269                         rc = ptlrpc_error(req);
270                         RETURN(rc);
271                 }
272         }
273         RETURN(+1);
274 }
275
276 int tgt_request_handle(struct ptlrpc_request *req)
277 {
278         struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
279         struct lustre_msg       *msg = req->rq_reqmsg;
280         struct tgt_handler      *h;
281         struct tgt_opc_slice    *s;
282         struct lu_target        *tgt;
283         int                      request_fail_id = 0;
284         __u32                    opc = lustre_msg_get_opc(msg);
285         int                      rc;
286
287         ENTRY;
288
289         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
290         tsi->tsi_pill = &req->rq_pill;
291         tsi->tsi_env = req->rq_svc_thread->t_env;
292         tsi->tsi_dlm_req = NULL;
293
294         /* if request has export then get handlers slice from corresponding
295          * target, otherwise that should be connect operation */
296         if (opc == MDS_CONNECT || opc == OST_CONNECT ||
297             opc == MGS_CONNECT) {
298                 req_capsule_set(&req->rq_pill, &RQF_CONNECT);
299                 rc = target_handle_connect(req);
300                 if (rc != 0) {
301                         rc = ptlrpc_error(req);
302                         GOTO(out, rc);
303                 }
304         }
305
306         /* this should be assertion actually, but keep it reporting error
307          * for unified target development time */
308         if (req->rq_export == NULL) {
309                 CERROR("Request with no export from %s, opcode %u\n",
310                        libcfs_nid2str(req->rq_peer.nid), opc);
311                 req->rq_status = -EFAULT;
312                 rc = ptlrpc_error(req);
313                 GOTO(out, rc);
314         }
315
316
317         tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
318         tsi->tsi_exp = req->rq_export;
319
320         request_fail_id = tgt->lut_request_fail_id;
321         tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
322
323         for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
324                 if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
325                         break;
326
327         /* opcode was not found in slice */
328         if (unlikely(s->tos_hs == NULL)) {
329                 CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
330                 req->rq_status = -ENOTSUPP;
331                 rc = ptlrpc_error(req);
332                 GOTO(out, rc);
333         }
334
335         if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
336                 GOTO(out, rc = 0);
337
338         LASSERT(current->journal_info == NULL);
339
340         LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
341         h = s->tos_hs + (opc - s->tos_opc_start);
342         if (unlikely(h->th_opc == 0)) {
343                 CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
344                 req->rq_status = -ENOTSUPP;
345                 rc = ptlrpc_error(req);
346                 GOTO(out, rc);
347         }
348
349         rc = lustre_msg_check_version(msg, h->th_version);
350         if (unlikely(rc)) {
351                 DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
352                           " %08x, expecting %08x\n", tgt_name(tgt),
353                           lustre_msg_get_version(msg), h->th_version);
354                 req->rq_status = -EINVAL;
355                 rc = ptlrpc_error(req);
356                 GOTO(out, rc);
357         }
358
359         rc = tgt_handle_recovery(req, tsi->tsi_reply_fail_id);
360         if (likely(rc == 1)) {
361                 LASSERTF(h->th_opc == opc, "opcode mismatch %d != %d\n",
362                          h->th_opc, opc);
363                 rc = tgt_handle_request0(tsi, h, req);
364                 if (rc)
365                         GOTO(out, rc);
366         }
367         EXIT;
368 out:
369         req_capsule_fini(tsi->tsi_pill);
370         tsi->tsi_pill = NULL;
371         return rc;
372 }
373 EXPORT_SYMBOL(tgt_request_handle);
374
375 void tgt_counter_incr(struct obd_export *exp, int opcode)
376 {
377         lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
378         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
379                 lprocfs_counter_incr(exp->exp_nid_stats->nid_stats, opcode);
380 }
381 EXPORT_SYMBOL(tgt_counter_incr);
382
383 /*
384  * Unified target generic handlers.
385  */
386
387 /*
388  * Security functions
389  */
390 static inline void tgt_init_sec_none(struct obd_connect_data *reply)
391 {
392         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
393                                       OBD_CONNECT_RMT_CLIENT_FORCE |
394                                       OBD_CONNECT_MDS_CAPA |
395                                       OBD_CONNECT_OSS_CAPA);
396 }
397
398 static int tgt_init_sec_level(struct ptlrpc_request *req)
399 {
400         struct lu_target        *tgt = class_exp2tgt(req->rq_export);
401         char                    *client = libcfs_nid2str(req->rq_peer.nid);
402         struct obd_connect_data *data, *reply;
403         int                      rc = 0;
404         bool                     remote;
405
406         ENTRY;
407
408         data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
409         reply = req_capsule_server_get(&req->rq_pill, &RMF_CONNECT_DATA);
410         if (data == NULL || reply == NULL)
411                 RETURN(-EFAULT);
412
413         /* connection from MDT is always trusted */
414         if (req->rq_auth_usr_mdt) {
415                 tgt_init_sec_none(reply);
416                 RETURN(0);
417         }
418
419         /* no GSS support case */
420         if (!req->rq_auth_gss) {
421                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
422                         CWARN("client %s -> target %s does not use GSS, "
423                               "can not run under security level %d.\n",
424                               client, tgt_name(tgt), tgt->lut_sec_level);
425                         RETURN(-EACCES);
426                 } else {
427                         tgt_init_sec_none(reply);
428                         RETURN(0);
429                 }
430         }
431
432         /* old version case */
433         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
434                      !(data->ocd_connect_flags & OBD_CONNECT_MDS_CAPA) ||
435                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
436                 if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
437                         CWARN("client %s -> target %s uses old version, "
438                               "can not run under security level %d.\n",
439                               client, tgt_name(tgt), tgt->lut_sec_level);
440                         RETURN(-EACCES);
441                 } else {
442                         CWARN("client %s -> target %s uses old version, "
443                               "run under security level %d.\n",
444                               client, tgt_name(tgt), tgt->lut_sec_level);
445                         tgt_init_sec_none(reply);
446                         RETURN(0);
447                 }
448         }
449
450         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
451         if (remote) {
452                 if (!req->rq_auth_remote)
453                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
454                                "asked to be remote.\n", client, tgt_name(tgt));
455         } else if (req->rq_auth_remote) {
456                 remote = true;
457                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
458                        "as remote by default.\n", client, tgt_name(tgt));
459         }
460
461         if (remote) {
462                 if (!tgt->lut_oss_capa) {
463                         CDEBUG(D_SEC,
464                                "client %s -> target %s is set as remote,"
465                                " but OSS capabilities are not enabled: %d.\n",
466                                client, tgt_name(tgt), tgt->lut_oss_capa);
467                         RETURN(-EACCES);
468                 }
469         } else {
470                 if (req->rq_auth_uid == INVALID_UID) {
471                         CDEBUG(D_SEC, "client %s -> target %s: user is not "
472                                "authenticated!\n", client, tgt_name(tgt));
473                         RETURN(-EACCES);
474                 }
475         }
476
477
478         switch (tgt->lut_sec_level) {
479         case LUSTRE_SEC_NONE:
480                 if (remote) {
481                         CDEBUG(D_SEC,
482                                "client %s -> target %s is set as remote, "
483                                "can not run under security level %d.\n",
484                                client, tgt_name(tgt), tgt->lut_sec_level);
485                         RETURN(-EACCES);
486                 }
487                 tgt_init_sec_none(reply);
488                 break;
489         case LUSTRE_SEC_REMOTE:
490                 if (!remote)
491                         tgt_init_sec_none(reply);
492                 break;
493         case LUSTRE_SEC_ALL:
494                 if (remote)
495                         break;
496                 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
497                                               OBD_CONNECT_RMT_CLIENT_FORCE);
498                 if (!tgt->lut_oss_capa)
499                         reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
500                 if (!tgt->lut_mds_capa)
501                         reply->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
502                 break;
503         default:
504                 RETURN(-EINVAL);
505         }
506
507         RETURN(rc);
508 }
509
510 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp)
511 {
512         struct lu_target        *tgt = class_exp2tgt(exp);
513         struct sptlrpc_flavor    flvr;
514         int                      rc = 0;
515
516         LASSERT(tgt);
517         LASSERT(tgt->lut_obd);
518         LASSERT(tgt->lut_slice);
519
520         /* always allow ECHO client */
521         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
522                             LUSTRE_ECHO_NAME) == 0)) {
523                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
524                 return 0;
525         }
526
527         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
528                 read_lock(&tgt->lut_sptlrpc_lock);
529                 sptlrpc_target_choose_flavor(&tgt->lut_sptlrpc_rset,
530                                              req->rq_sp_from,
531                                              req->rq_peer.nid,
532                                              &flvr);
533                 read_unlock(&tgt->lut_sptlrpc_lock);
534
535                 spin_lock(&exp->exp_lock);
536                 exp->exp_sp_peer = req->rq_sp_from;
537                 exp->exp_flvr = flvr;
538                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
539                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
540                         CERROR("%s: unauthorized rpc flavor %x from %s, "
541                                "expect %x\n", tgt_name(tgt),
542                                req->rq_flvr.sf_rpc,
543                                libcfs_nid2str(req->rq_peer.nid),
544                                exp->exp_flvr.sf_rpc);
545                         rc = -EACCES;
546                 }
547                 spin_unlock(&exp->exp_lock);
548         } else {
549                 if (exp->exp_sp_peer != req->rq_sp_from) {
550                         CERROR("%s: RPC source %s doesn't match %s\n",
551                                tgt_name(tgt),
552                                sptlrpc_part2name(req->rq_sp_from),
553                                sptlrpc_part2name(exp->exp_sp_peer));
554                         rc = -EACCES;
555                 } else {
556                         rc = sptlrpc_target_export_check(exp, req);
557                 }
558         }
559
560         return rc;
561 }
562
563 int tgt_connect(struct tgt_session_info *tsi)
564 {
565         struct ptlrpc_request   *req = tgt_ses_req(tsi);
566         struct obd_connect_data *reply;
567         int                      rc;
568
569         ENTRY;
570
571         rc = tgt_init_sec_level(req);
572         if (rc != 0)
573                 GOTO(out, rc);
574
575         /* XXX: better to call this check right after getting new export but
576          * before last_rcvd slot allocation to avoid server load upon insecure
577          * connects. This is to be fixed after unifiyng all targets.
578          */
579         rc = tgt_connect_check_sptlrpc(req, tsi->tsi_exp);
580         if (rc)
581                 GOTO(out, rc);
582
583         /* To avoid exposing partially initialized connection flags, changes up
584          * to this point have been staged in reply->ocd_connect_flags. Now that
585          * connection handling has completed successfully, atomically update
586          * the connect flags in the shared export data structure. LU-1623 */
587         reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
588         spin_lock(&tsi->tsi_exp->exp_lock);
589         *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
590         spin_unlock(&tsi->tsi_exp->exp_lock);
591
592         RETURN(0);
593 out:
594         obd_disconnect(class_export_get(tsi->tsi_exp));
595         return rc;
596 }
597 EXPORT_SYMBOL(tgt_connect);
598
599 int tgt_disconnect(struct tgt_session_info *tsi)
600 {
601         int rc;
602
603         ENTRY;
604
605         rc = target_handle_disconnect(tgt_ses_req(tsi));
606         if (rc)
607                 RETURN(err_serious(rc));
608
609         RETURN(rc);
610 }
611 EXPORT_SYMBOL(tgt_disconnect);
612
613 /*
614  * Unified target OBD handlers
615  */
616 int tgt_obd_ping(struct tgt_session_info *tsi)
617 {
618         int rc;
619
620         ENTRY;
621
622         rc = target_handle_ping(tgt_ses_req(tsi));
623         if (rc)
624                 RETURN(err_serious(rc));
625
626         RETURN(rc);
627 }
628 EXPORT_SYMBOL(tgt_obd_ping);
629
630 int tgt_obd_log_cancel(struct tgt_session_info *tsi)
631 {
632         return err_serious(-EOPNOTSUPP);
633 }
634 EXPORT_SYMBOL(tgt_obd_log_cancel);
635
636 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
637 {
638         return err_serious(-EOPNOTSUPP);
639 }
640 EXPORT_SYMBOL(tgt_obd_qc_callback);
641
642 /*
643  * Unified target DLM handlers.
644  */
645 struct ldlm_callback_suite tgt_dlm_cbs = {
646         .lcs_completion = ldlm_server_completion_ast,
647         .lcs_blocking = ldlm_server_blocking_ast,
648 };
649
650 int tgt_enqueue(struct tgt_session_info *tsi)
651 {
652         struct ptlrpc_request *req = tgt_ses_req(tsi);
653         int rc;
654
655         ENTRY;
656         /*
657          * tsi->tsi_dlm_req was already swapped and (if necessary) converted,
658          * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
659          */
660         LASSERT(tsi->tsi_dlm_req != NULL);
661
662         rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
663                                   tsi->tsi_dlm_req, &tgt_dlm_cbs);
664         if (rc)
665                 RETURN(err_serious(rc));
666
667         RETURN(req->rq_status);
668 }
669 EXPORT_SYMBOL(tgt_enqueue);
670
671 int tgt_convert(struct tgt_session_info *tsi)
672 {
673         struct ptlrpc_request *req = tgt_ses_req(tsi);
674         int rc;
675
676         ENTRY;
677         LASSERT(tsi->tsi_dlm_req);
678         rc = ldlm_handle_convert0(req, tsi->tsi_dlm_req);
679         if (rc)
680                 RETURN(err_serious(rc));
681
682         RETURN(req->rq_status);
683 }
684 EXPORT_SYMBOL(tgt_convert);
685
686 int tgt_bl_callback(struct tgt_session_info *tsi)
687 {
688         return err_serious(-EOPNOTSUPP);
689 }
690 EXPORT_SYMBOL(tgt_bl_callback);
691
692 int tgt_cp_callback(struct tgt_session_info *tsi)
693 {
694         return err_serious(-EOPNOTSUPP);
695 }
696 EXPORT_SYMBOL(tgt_cp_callback);
697
698 /*
699  * Unified target LLOG handlers.
700  */
701 int tgt_llog_open(struct tgt_session_info *tsi)
702 {
703         int rc;
704
705         ENTRY;
706
707         rc = llog_origin_handle_open(tgt_ses_req(tsi));
708
709         RETURN(rc);
710 }
711 EXPORT_SYMBOL(tgt_llog_open);
712
713 int tgt_llog_close(struct tgt_session_info *tsi)
714 {
715         int rc;
716
717         ENTRY;
718
719         rc = llog_origin_handle_close(tgt_ses_req(tsi));
720
721         RETURN(rc);
722 }
723 EXPORT_SYMBOL(tgt_llog_close);
724
725
726 int tgt_llog_destroy(struct tgt_session_info *tsi)
727 {
728         int rc;
729
730         ENTRY;
731
732         rc = llog_origin_handle_destroy(tgt_ses_req(tsi));
733
734         RETURN(rc);
735 }
736 EXPORT_SYMBOL(tgt_llog_destroy);
737
738 int tgt_llog_read_header(struct tgt_session_info *tsi)
739 {
740         int rc;
741
742         ENTRY;
743
744         rc = llog_origin_handle_read_header(tgt_ses_req(tsi));
745
746         RETURN(rc);
747 }
748 EXPORT_SYMBOL(tgt_llog_read_header);
749
750 int tgt_llog_next_block(struct tgt_session_info *tsi)
751 {
752         int rc;
753
754         ENTRY;
755
756         rc = llog_origin_handle_next_block(tgt_ses_req(tsi));
757
758         RETURN(rc);
759 }
760 EXPORT_SYMBOL(tgt_llog_next_block);
761
762 int tgt_llog_prev_block(struct tgt_session_info *tsi)
763 {
764         int rc;
765
766         ENTRY;
767
768         rc = llog_origin_handle_prev_block(tgt_ses_req(tsi));
769
770         RETURN(rc);
771 }
772 EXPORT_SYMBOL(tgt_llog_prev_block);