Whamcloud - gitweb
16d7201eb83a037d157f02ebe9c1ee2ff05d355f
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/ost/ost_handler.c
39  *
40  * Author: Peter J. Braam <braam@clusterfs.com>
41  * Author: Phil Schwan <phil@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_OST
48
49 #include <linux/module.h>
50 #include <obd_cksum.h>
51 #include <obd_ost.h>
52 #include <lustre_net.h>
53 #include <lustre_dlm.h>
54 #include <lustre_export.h>
55 #include <lustre_debug.h>
56 #include <linux/init.h>
57 #include <lprocfs_status.h>
58 #include <libcfs/list.h>
59 #include <lustre_quota.h>
60 #include "ost_internal.h"
61
62 static int oss_num_threads;
63 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
64                 "number of OSS service threads to start");
65
66 static int ost_num_threads;
67 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
68                 "number of OST service threads to start (deprecated)");
69
70 static int oss_num_create_threads;
71 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
72                 "number of OSS create threads to start");
73
74 /**
75  * Do not return server-side uid/gid to remote client
76  */
77 static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
78 {
79         if (exp_connect_rmtclient(exp)) {
80                 oa->o_uid = -1;
81                 oa->o_gid = -1;
82                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
83         }
84 }
85
86 /**
87  * Validate oa from client.
88  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
89  * req are valid.
90  *    a. for single MDS  seq = FID_SEQ_OST_MDT0,
91  *    b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
92  */
93 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
94                              struct obd_ioobj *ioobj)
95 {
96         if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) {
97                 oa->o_seq = FID_SEQ_OST_MDT0;
98                 if (ioobj)
99                         ioobj->ioo_seq = FID_SEQ_OST_MDT0;
100         /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
101         } else if (oa == NULL ||
102                    !(fid_seq_is_rsvd(oa->o_seq) || fid_seq_is_idif(oa->o_seq))) {
103                 CERROR("%s: client %s sent invalid object "POSTID"\n",
104                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
105                        oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
106                 return -EPROTO;
107         }
108         obdo_from_ostid(oa, &oa->o_oi);
109         if (ioobj)
110                 ioobj_from_obdo(ioobj, oa);
111         return 0;
112 }
113
114 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
115 {
116         struct oti_req_ack_lock *ack_lock;
117         int i;
118
119         if (oti == NULL)
120                 return;
121
122         if (req->rq_repmsg) {
123                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
124                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
125                 versions[0] = oti->oti_pre_version;
126                 lustre_msg_set_versions(req->rq_repmsg, versions);
127         }
128         req->rq_transno = oti->oti_transno;
129
130         /* XXX 4 == entries in oti_ack_locks??? */
131         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
132                 if (!ack_lock->mode)
133                         break;
134                 /* XXX not even calling target_send_reply in some cases... */
135                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
136         }
137 }
138
139 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
140                        struct obd_trans_info *oti)
141 {
142         struct ost_body *body, *repbody;
143         struct lustre_capa *capa = NULL;
144         int rc;
145         ENTRY;
146
147         /* Get the request body */
148         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body == NULL)
150                 RETURN(-EFAULT);
151
152         if (body->oa.o_id == 0)
153                 RETURN(-EPROTO);
154
155         rc = ost_validate_obdo(exp, &body->oa, NULL);
156         if (rc)
157                 RETURN(rc);
158
159         /* If there's a DLM request, cancel the locks mentioned in it*/
160         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
161                 struct ldlm_request *dlm;
162
163                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
164                 if (dlm == NULL)
165                         RETURN (-EFAULT);
166                 ldlm_request_cancel(req, dlm, 0);
167         }
168
169         /* If there's a capability, get it */
170         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
171                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
172                 if (capa == NULL) {
173                         CERROR("Missing capability for OST DESTROY");
174                         RETURN (-EFAULT);
175                 }
176         }
177
178         /* Prepare the reply */
179         rc = req_capsule_server_pack(&req->rq_pill);
180         if (rc)
181                 RETURN(rc);
182
183         /* Get the log cancellation cookie */
184         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
185                 oti->oti_logcookies = &body->oa.o_lcookie;
186
187         /* Finish the reply */
188         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
189         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
190
191         /* Do the destroy and set the reply status accordingly  */
192         req->rq_status = obd_destroy(exp, &repbody->oa, NULL, oti, NULL, capa);
193         RETURN(0);
194 }
195
196 /**
197  * Helper function for getting server side [start, start+count] DLM lock
198  * if asked by client.
199  */
200 static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
201                         __u64 start, __u64 count, struct lustre_handle *lh,
202                         int mode, int flags)
203 {
204         struct ldlm_res_id res_id;
205         ldlm_policy_data_t policy;
206         __u64 end = start + count;
207
208         ENTRY;
209
210         LASSERT(!lustre_handle_is_used(lh));
211         /* o_id and o_gr are used for localizing resource, if client miss to set
212          * them, do not trigger ASSERTION. */
213         if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
214                      (OBD_MD_FLID | OBD_MD_FLGROUP)))
215                 RETURN(-EPROTO);
216
217         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
218             !(oa->o_flags & OBD_FL_SRVLOCK))
219                 RETURN(0);
220
221         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
222         CDEBUG(D_INODE, "OST-side extent lock.\n");
223
224         policy.l_extent.start = start & CFS_PAGE_MASK;
225
226         /* If ->o_blocks is EOF it means "lock till the end of the
227          * file". Otherwise, it's size of a hole being punched (in bytes) */
228         if (count == OBD_OBJECT_EOF || end < start)
229                 policy.l_extent.end = OBD_OBJECT_EOF;
230         else
231                 policy.l_extent.end = end | ~CFS_PAGE_MASK;
232
233         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
234                                       LDLM_EXTENT, &policy, mode, &flags,
235                                       ldlm_blocking_ast, ldlm_completion_ast,
236                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
237 }
238
239 /* Helper function: release lock, if any. */
240 static void ost_lock_put(struct obd_export *exp,
241                          struct lustre_handle *lh, int mode)
242 {
243         ENTRY;
244         if (lustre_handle_is_used(lh))
245                 ldlm_lock_decref(lh, mode);
246         EXIT;
247 }
248
249 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
250 {
251         struct ost_body *body, *repbody;
252         struct obd_info *oinfo;
253         struct lustre_handle lh = { 0 };
254         struct lustre_capa *capa = NULL;
255         int rc;
256         ENTRY;
257
258         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
259         if (body == NULL)
260                 RETURN(-EFAULT);
261
262         rc = ost_validate_obdo(exp, &body->oa, NULL);
263         if (rc)
264                 RETURN(rc);
265
266         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
267                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
268                 if (capa == NULL) {
269                         CERROR("Missing capability for OST GETATTR");
270                         RETURN(-EFAULT);
271                 }
272         }
273
274         rc = req_capsule_server_pack(&req->rq_pill);
275         if (rc)
276                 RETURN(rc);
277
278         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
279         repbody->oa = body->oa;
280
281         rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
282         if (rc)
283                 RETURN(rc);
284
285         OBD_ALLOC_PTR(oinfo);
286         if (!oinfo)
287                 GOTO(unlock, rc = -ENOMEM);
288         oinfo->oi_oa = &repbody->oa;
289         oinfo->oi_capa = capa;
290
291         req->rq_status = obd_getattr(exp, oinfo);
292
293         OBD_FREE_PTR(oinfo);
294
295         ost_drop_id(exp, &repbody->oa);
296
297 unlock:
298         ost_lock_put(exp, &lh, LCK_PR);
299         RETURN(rc);
300 }
301
302 static int ost_statfs(struct ptlrpc_request *req)
303 {
304         struct obd_statfs *osfs;
305         int rc;
306         ENTRY;
307
308         rc = req_capsule_server_pack(&req->rq_pill);
309         if (rc)
310                 RETURN(rc);
311
312         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
313
314         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
315                                     cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
316                                     0);
317         if (req->rq_status != 0)
318                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
319
320         RETURN(0);
321 }
322
323 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
324                       struct obd_trans_info *oti)
325 {
326         struct ost_body *body, *repbody;
327         int rc;
328         ENTRY;
329
330         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
331         if (body == NULL)
332                 RETURN(-EFAULT);
333
334         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
335         if (rc)
336                 RETURN(rc);
337
338         rc = req_capsule_server_pack(&req->rq_pill);
339         if (rc)
340                 RETURN(rc);
341
342         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
343         repbody->oa = body->oa;
344         oti->oti_logcookies = &body->oa.o_lcookie;
345
346         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
347         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
348         RETURN(0);
349 }
350
351 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
352                      struct obd_trans_info *oti)
353 {
354         struct ost_body *body, *repbody;
355         int rc, flags = 0;
356         struct lustre_handle lh = {0,};
357         ENTRY;
358
359         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
360         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
361
362         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 RETURN(-EFAULT);
365
366         rc = ost_validate_obdo(exp, &body->oa, NULL);
367         if (rc)
368                 RETURN(rc);
369
370         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
371             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
372                 RETURN(-EPROTO);
373
374         rc = req_capsule_server_pack(&req->rq_pill);
375         if (rc)
376                 RETURN(rc);
377
378         /* standard truncate optimization: if file body is completely
379          * destroyed, don't send data back to the server. */
380         if (body->oa.o_size == 0)
381                 flags |= LDLM_AST_DISCARD_DATA;
382
383         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
384         repbody->oa = body->oa;
385
386         rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
387                           repbody->oa.o_blocks, &lh, LCK_PW, flags);
388         if (rc == 0) {
389                 struct obd_info *oinfo;
390                 struct lustre_capa *capa = NULL;
391
392                 if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
393                     repbody->oa.o_flags == OBD_FL_SRVLOCK)
394                         /*
395                          * If OBD_FL_SRVLOCK is the only bit set in
396                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
397                          * through filter_setattr() to filter_iocontrol().
398                          */
399                         repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
400
401                 if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
402                         capa = req_capsule_client_get(&req->rq_pill,
403                                                       &RMF_CAPA1);
404                         if (capa == NULL) {
405                                 CERROR("Missing capability for OST PUNCH");
406                                 GOTO(unlock, rc = -EFAULT);
407                         }
408                 }
409
410                 OBD_ALLOC_PTR(oinfo);
411                 if (!oinfo)
412                         GOTO(unlock, rc = -ENOMEM);
413                 oinfo->oi_oa = &repbody->oa;
414                 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
415                 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
416                 oinfo->oi_capa = capa;
417                 oinfo->oi_flags = OBD_FL_PUNCH;
418
419                 req->rq_status = obd_punch(exp, oinfo, oti, NULL);
420                 OBD_FREE_PTR(oinfo);
421 unlock:
422                 ost_lock_put(exp, &lh, LCK_PW);
423         }
424
425         ost_drop_id(exp, &repbody->oa);
426         RETURN(rc);
427 }
428
429 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
430 {
431         struct ost_body *body, *repbody;
432         struct obd_info *oinfo;
433         struct lustre_capa *capa = NULL;
434         int rc;
435         ENTRY;
436
437         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
438         if (body == NULL)
439                 RETURN(-EFAULT);
440
441         rc = ost_validate_obdo(exp, &body->oa, NULL);
442         if (rc)
443                 RETURN(rc);
444
445         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
446                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
447                 if (capa == NULL) {
448                         CERROR("Missing capability for OST SYNC");
449                         RETURN (-EFAULT);
450                 }
451         }
452
453         rc = req_capsule_server_pack(&req->rq_pill);
454         if (rc)
455                 RETURN(rc);
456
457         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458         repbody->oa = body->oa;
459
460         OBD_ALLOC_PTR(oinfo);
461         if (!oinfo)
462                 RETURN(-ENOMEM);
463
464         oinfo->oi_oa = &repbody->oa;
465         oinfo->oi_capa = capa;
466         req->rq_status = obd_sync(exp, oinfo, repbody->oa.o_size,
467                                   repbody->oa.o_blocks, NULL);
468         OBD_FREE_PTR(oinfo);
469
470         ost_drop_id(exp, &repbody->oa);
471         RETURN(0);
472 }
473
474 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
475                        struct obd_trans_info *oti)
476 {
477         struct ost_body *body, *repbody;
478         struct obd_info *oinfo;
479         struct lustre_capa *capa = NULL;
480         int rc;
481         ENTRY;
482
483         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
484         if (body == NULL)
485                 RETURN(-EFAULT);
486
487         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
488         if (rc)
489                 RETURN(rc);
490
491         rc = req_capsule_server_pack(&req->rq_pill);
492         if (rc)
493                 RETURN(rc);
494
495         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
496                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
497                 if (capa == NULL) {
498                         CERROR("Missing capability for OST SETATTR");
499                         RETURN (-EFAULT);
500                 }
501         }
502
503         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
504         repbody->oa = body->oa;
505
506         OBD_ALLOC_PTR(oinfo);
507         if (!oinfo)
508                 RETURN(-ENOMEM);
509         oinfo->oi_oa = &repbody->oa;
510         oinfo->oi_capa = capa;
511
512         req->rq_status = obd_setattr(exp, oinfo, oti);
513
514         OBD_FREE_PTR(oinfo);
515
516         ost_drop_id(exp, &repbody->oa);
517         RETURN(0);
518 }
519
520 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
521                                cksum_type_t cksum_type)
522 {
523         __u32 cksum;
524         int i;
525
526         cksum = init_checksum(cksum_type);
527         for (i = 0; i < desc->bd_iov_count; i++) {
528                 struct page *page = desc->bd_iov[i].kiov_page;
529                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
530                 char *ptr = kmap(page) + off;
531                 int len = desc->bd_iov[i].kiov_len;
532
533                 /* corrupt the data before we compute the checksum, to
534                  * simulate a client->OST data error */
535                 if (i == 0 && opc == OST_WRITE &&
536                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE))
537                         memcpy(ptr, "bad3", min(4, len));
538                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
539                 /* corrupt the data after we compute the checksum, to
540                  * simulate an OST->client data error */
541                 if (i == 0 && opc == OST_READ &&
542                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
543                         memcpy(ptr, "bad4", min(4, len));
544                         /* nobody should use corrupted page again */
545                         ClearPageUptodate(page);
546                 }
547                 kunmap(page);
548         }
549
550         return fini_checksum(cksum, cksum_type);
551 }
552
553 static int ost_brw_lock_get(int mode, struct obd_export *exp,
554                             struct obd_ioobj *obj, struct niobuf_remote *nb,
555                             struct lustre_handle *lh)
556 {
557         int flags                 = 0;
558         int nrbufs                = obj->ioo_bufcnt;
559         struct ldlm_res_id res_id;
560         ldlm_policy_data_t policy;
561         int i;
562         ENTRY;
563
564         osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
565         LASSERT(mode == LCK_PR || mode == LCK_PW);
566         LASSERT(!lustre_handle_is_used(lh));
567
568         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
569                 RETURN(0);
570
571         for (i = 1; i < nrbufs; i ++)
572                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
573                     (nb[i].flags & OBD_BRW_SRVLOCK))
574                         RETURN(-EFAULT);
575
576         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
577         policy.l_extent.end   = (nb[nrbufs - 1].offset +
578                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
579
580         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
581                                       LDLM_EXTENT, &policy, mode, &flags,
582                                       ldlm_blocking_ast, ldlm_completion_ast,
583                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
584 }
585
586 static void ost_brw_lock_put(int mode,
587                              struct obd_ioobj *obj, struct niobuf_remote *niob,
588                              struct lustre_handle *lh)
589 {
590         ENTRY;
591         LASSERT(mode == LCK_PR || mode == LCK_PW);
592         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
593                 lustre_handle_is_used(lh));
594         if (lustre_handle_is_used(lh))
595                 ldlm_lock_decref(lh, mode);
596         EXIT;
597 }
598
599 /* Allocate thread local buffers if needed */
600 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
601 {
602         struct ost_thread_local_cache *tls =
603                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
604
605         /* In normal mode of operation an I/O request is serviced only
606          * by ll_ost_io threads each of them has own tls buffers allocated by
607          * ost_thread_init().
608          * During recovery, an I/O request may be queued until any of the ost
609          * service threads process it. Not necessary it should be one of
610          * ll_ost_io threads. In that case we dynamically allocating tls
611          * buffers for the request service time. */
612         if (unlikely(tls == NULL)) {
613                 LASSERT(r->rq_export->exp_in_recovery);
614                 OBD_ALLOC_PTR(tls);
615                 if (tls != NULL) {
616                         tls->temporary = 1;
617                         r->rq_svc_thread->t_data = tls;
618                 }
619         }
620         return  tls;
621 }
622
623 /* Free thread local buffers if they were allocated only for servicing
624  * this one request */
625 static void ost_tls_put(struct ptlrpc_request *r)
626 {
627         struct ost_thread_local_cache *tls =
628                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
629
630         if (unlikely(tls->temporary)) {
631                 OBD_FREE_PTR(tls);
632                 r->rq_svc_thread->t_data = NULL;
633         }
634 }
635
636 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
637 {
638         struct ptlrpc_bulk_desc *desc = NULL;
639         struct obd_export *exp = req->rq_export;
640         struct niobuf_remote *remote_nb;
641         struct niobuf_local *local_nb;
642         struct obd_ioobj *ioo;
643         struct ost_body *body, *repbody;
644         struct lustre_capa *capa = NULL;
645         struct l_wait_info lwi;
646         struct lustre_handle lockh = { 0 };
647         int niocount, npages, nob = 0, rc, i;
648         int no_reply = 0;
649         struct ost_thread_local_cache *tls;
650         ENTRY;
651
652         req->rq_bulk_read = 1;
653
654         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
655                 GOTO(out, rc = -EIO);
656
657         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
658
659         /* Check if there is eviction in progress, and if so, wait for it to
660          * finish */
661         if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
662                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
663                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
664                         !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
665                         &lwi);
666         }
667         if (exp->exp_failed)
668                 GOTO(out, rc = -ENOTCONN);
669
670         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
671          * ost_rw_hpreq_check(). */
672         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
673         if (body == NULL)
674                 GOTO(out, rc = -EFAULT);
675
676         /*
677          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
678          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
679          * &RMF_NIOBUF_REMOTE.
680          */
681         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
682         if (ioo == NULL)
683                 GOTO(out, rc = -EFAULT);
684
685         rc = ost_validate_obdo(exp, &body->oa, ioo);
686         if (rc)
687                 RETURN(rc);
688
689         niocount = ioo->ioo_bufcnt;
690         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
691         if (remote_nb == NULL)
692                 GOTO(out, rc = -EFAULT);
693
694         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
695                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
696                 if (capa == NULL) {
697                         CERROR("Missing capability for OST BRW READ");
698                         GOTO(out, rc = -EFAULT);
699                 }
700         }
701
702         rc = req_capsule_server_pack(&req->rq_pill);
703         if (rc)
704                 GOTO(out, rc);
705
706         tls = ost_tls_get(req);
707         if (tls == NULL)
708                 GOTO(out_bulk, rc = -ENOMEM);
709         local_nb = tls->local;
710
711         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
712         if (rc != 0)
713                 GOTO(out_tls, rc);
714
715         /*
716          * If getting the lock took more time than
717          * client was willing to wait, drop it. b=11330
718          */
719         if (cfs_time_current_sec() > req->rq_deadline ||
720             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
721                 no_reply = 1;
722                 CERROR("Dropping timed-out read from %s because locking"
723                        "object "LPX64" took %ld seconds (limit was %ld).\n",
724                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
725                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
726                        req->rq_deadline - req->rq_arrival_time.tv_sec);
727                 GOTO(out_lock, rc = -ETIMEDOUT);
728         }
729
730         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
731         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
732
733         npages = OST_THREAD_POOL_SIZE;
734         rc = obd_preprw(OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
735                         remote_nb, &npages, local_nb, oti, capa);
736         if (rc != 0)
737                 GOTO(out_lock, rc);
738
739         desc = ptlrpc_prep_bulk_exp(req, npages,
740                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
741         if (desc == NULL)
742                 GOTO(out_commitrw, rc = -ENOMEM);
743
744         nob = 0;
745         for (i = 0; i < npages; i++) {
746                 int page_rc = local_nb[i].rc;
747
748                 if (page_rc < 0) {              /* error */
749                         rc = page_rc;
750                         break;
751                 }
752
753                 nob += page_rc;
754                 if (page_rc != 0) {             /* some data! */
755                         LASSERT (local_nb[i].page != NULL);
756                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
757                                               local_nb[i].offset & ~CFS_PAGE_MASK,
758                                               page_rc);
759                 }
760
761                 if (page_rc != local_nb[i].len) { /* short read */
762                         /* All subsequent pages should be 0 */
763                         while(++i < npages)
764                                 LASSERT(local_nb[i].rc == 0);
765                         break;
766                 }
767         }
768
769         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
770                 cksum_type_t cksum_type =
771                         cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
772                                           repbody->oa.o_flags : 0);
773                 repbody->oa.o_flags = cksum_type_pack(cksum_type);
774                 repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
775                 repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
776                 CDEBUG(D_PAGE, "checksum at read origin: %x\n",
777                        repbody->oa.o_cksum);
778         } else {
779                 repbody->oa.o_valid = 0;
780         }
781         /* We're finishing using body->oa as an input variable */
782
783         /* Check if client was evicted while we were doing i/o before touching
784            network */
785         if (rc == 0) {
786                 if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
787                         rc = target_bulk_io(exp, desc, &lwi);
788                 no_reply = rc != 0;
789         }
790
791 out_commitrw:
792         /* Must commit after prep above in all cases */
793         rc = obd_commitrw(OBD_BRW_READ, exp, &repbody->oa, 1, ioo,
794                           remote_nb, npages, local_nb, oti, rc);
795
796         if (rc == 0)
797                 ost_drop_id(exp, &repbody->oa);
798
799 out_lock:
800         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
801 out_tls:
802         ost_tls_put(req);
803 out_bulk:
804         if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
805                 ptlrpc_free_bulk(desc);
806 out:
807         LASSERT(rc <= 0);
808         if (rc == 0) {
809                 req->rq_status = nob;
810                 ptlrpc_lprocfs_brw(req, nob);
811                 target_committed_to_req(req);
812                 ptlrpc_reply(req);
813         } else if (!no_reply) {
814                 /* Only reply if there was no comms problem with bulk */
815                 target_committed_to_req(req);
816                 req->rq_status = rc;
817                 ptlrpc_error(req);
818         } else {
819                 /* reply out callback would free */
820                 ptlrpc_req_drop_rs(req);
821                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
822                       "client will retry\n",
823                       exp->exp_obd->obd_name,
824                       exp->exp_client_uuid.uuid,
825                       exp->exp_connection->c_remote_uuid.uuid,
826                       libcfs_id2str(req->rq_peer));
827         }
828         /* send a bulk after reply to simulate a network delay or reordering
829          * by a router */
830         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
831                 cfs_waitq_t              waitq;
832                 struct l_wait_info       lwi1;
833
834                 CDEBUG(D_INFO, "reorder BULK\n");
835                 cfs_waitq_init(&waitq);
836
837                 lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
838                 l_wait_event(waitq, 0, &lwi1);
839                 rc = target_bulk_io(exp, desc, &lwi);
840                 ptlrpc_free_bulk(desc);
841         }
842
843         RETURN(rc);
844 }
845
846 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
847 {
848         struct ptlrpc_bulk_desc *desc = NULL;
849         struct obd_export       *exp = req->rq_export;
850         struct niobuf_remote    *remote_nb;
851         struct niobuf_local     *local_nb;
852         struct obd_ioobj        *ioo;
853         struct ost_body         *body, *repbody;
854         struct l_wait_info       lwi;
855         struct lustre_handle     lockh = {0};
856         struct lustre_capa      *capa = NULL;
857         __u32                   *rcs;
858         int objcount, niocount, npages;
859         int rc, i, j;
860         obd_count                client_cksum = 0, server_cksum = 0;
861         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
862         int                      no_reply = 0, mmap = 0;
863         __u32                    o_uid = 0, o_gid = 0;
864         struct ost_thread_local_cache *tls;
865         ENTRY;
866
867         req->rq_bulk_write = 1;
868
869         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
870                 GOTO(out, rc = -EIO);
871         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
872                 GOTO(out, rc = -EFAULT);
873
874         /* pause before transaction has been started */
875         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
876
877         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
878          * ost_rw_hpreq_check(). */
879         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
880         if (body == NULL)
881                 GOTO(out, rc = -EFAULT);
882
883         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
884                                         RCL_CLIENT) / sizeof(*ioo);
885         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
886         if (ioo == NULL)
887                 GOTO(out, rc = -EFAULT);
888
889         rc = ost_validate_obdo(exp, &body->oa, ioo);
890         if (rc)
891                 RETURN(rc);
892
893         for (niocount = i = 0; i < objcount; i++)
894                 niocount += ioo[i].ioo_bufcnt;
895
896         /*
897          * It'd be nice to have a capsule function to indicate how many elements
898          * there were in a buffer for an RMF that's declared to be an array.
899          * It's easy enough to compute the number of elements here though.
900          */
901         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
902         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
903             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
904                 GOTO(out, rc = -EFAULT);
905
906         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
907             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
908                 cfs_memory_pressure_set();
909
910         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
911                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
912                 if (capa == NULL) {
913                         CERROR("Missing capability for OST BRW WRITE");
914                         GOTO(out, rc = -EFAULT);
915                 }
916         }
917
918         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
919                              niocount * sizeof(*rcs));
920         rc = req_capsule_server_pack(&req->rq_pill);
921         if (rc != 0)
922                 GOTO(out, rc);
923         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
924         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
925
926         tls = ost_tls_get(req);
927         if (tls == NULL)
928                 GOTO(out_bulk, rc = -ENOMEM);
929         local_nb = tls->local;
930
931         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
932         if (rc != 0)
933                 GOTO(out_tls, rc);
934
935         /*
936          * If getting the lock took more time than
937          * client was willing to wait, drop it. b=11330
938          */
939         if (cfs_time_current_sec() > req->rq_deadline ||
940             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
941                 no_reply = 1;
942                 CERROR("Dropping timed-out write from %s because locking "
943                        "object "LPX64" took %ld seconds (limit was %ld).\n",
944                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
945                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
946                        req->rq_deadline - req->rq_arrival_time.tv_sec);
947                 GOTO(out_lock, rc = -ETIMEDOUT);
948         }
949
950         /* obd_preprw clobbers oa->valid, so save what we need */
951         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
952                 client_cksum = body->oa.o_cksum;
953                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
954                         cksum_type = cksum_type_unpack(body->oa.o_flags);
955         }
956         if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
957                 mmap = 1;
958
959         /* Because we already sync grant info with client when reconnect,
960          * grant info will be cleared for resent req, then fed_grant and
961          * total_grant will not be modified in following preprw_write */
962         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
963                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
964                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
965         }
966
967         if (exp_connect_rmtclient(exp)) {
968                 o_uid = body->oa.o_uid;
969                 o_gid = body->oa.o_gid;
970         }
971
972         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
973         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
974
975         npages = OST_THREAD_POOL_SIZE;
976         rc = obd_preprw(OBD_BRW_WRITE, exp, &repbody->oa, objcount,
977                         ioo, remote_nb, &npages, local_nb, oti, capa);
978         if (rc != 0)
979                 GOTO(out_lock, rc);
980
981         desc = ptlrpc_prep_bulk_exp(req, npages,
982                                      BULK_GET_SINK, OST_BULK_PORTAL);
983         if (desc == NULL)
984                 GOTO(skip_transfer, rc = -ENOMEM);
985
986         /* NB Having prepped, we must commit... */
987
988         for (i = 0; i < npages; i++)
989                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
990                                       local_nb[i].offset & ~CFS_PAGE_MASK,
991                                       local_nb[i].len);
992
993         rc = sptlrpc_svc_prep_bulk(req, desc);
994         if (rc != 0)
995                 GOTO(out_lock, rc);
996
997         rc = target_bulk_io(exp, desc, &lwi);
998         no_reply = rc != 0;
999
1000 skip_transfer:
1001         if (client_cksum != 0 && rc == 0) {
1002                 static int cksum_counter;
1003                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1004                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1005                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1006                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1007                 repbody->oa.o_cksum = server_cksum;
1008                 cksum_counter++;
1009                 if (unlikely(client_cksum != server_cksum)) {
1010                         CDEBUG_LIMIT(mmap ? D_INFO : D_ERROR,
1011                                      "client csum %x, server csum %x\n",
1012                                      client_cksum, server_cksum);
1013                         cksum_counter = 0;
1014                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1015                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1016                                cksum_counter, libcfs_id2str(req->rq_peer),
1017                                server_cksum);
1018                 }
1019         }
1020
1021         /* Must commit after prep above in all cases */
1022         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1023                           remote_nb, npages, local_nb, oti, rc);
1024         if (rc == -ENOTCONN)
1025                 /* quota acquire process has been given up because
1026                  * either the client has been evicted or the client
1027                  * has timed out the request already */
1028                 no_reply = 1;
1029
1030         if (exp_connect_rmtclient(exp)) {
1031                 repbody->oa.o_uid = o_uid;
1032                 repbody->oa.o_gid = o_gid;
1033         }
1034
1035         /*
1036          * Disable sending mtime back to the client. If the client locked the
1037          * whole object, then it has already updated the mtime on its side,
1038          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1039          */
1040         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1041
1042         if (unlikely(client_cksum != server_cksum && rc == 0 && !mmap)) {
1043                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1044                 char *msg;
1045                 char *via;
1046                 char *router;
1047
1048                 if (new_cksum == server_cksum)
1049                         msg = "changed in transit before arrival at OST";
1050                 else if (new_cksum == client_cksum)
1051                         msg = "initial checksum before message complete";
1052                 else
1053                         msg = "changed in transit AND after initial checksum";
1054
1055                 if (req->rq_peer.nid == desc->bd_sender) {
1056                         via = router = "";
1057                 } else {
1058                         via = " via ";
1059                         router = libcfs_nid2str(desc->bd_sender);
1060                 }
1061
1062                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1063                                    "%s%s%s inode "DFID" object "
1064                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1065                                    exp->exp_obd->obd_name, msg,
1066                                    libcfs_id2str(req->rq_peer),
1067                                    via, router,
1068                                    body->oa.o_valid & OBD_MD_FLFID ?
1069                                                 body->oa.o_parent_seq : (__u64)0,
1070                                    body->oa.o_valid & OBD_MD_FLFID ?
1071                                                 body->oa.o_parent_oid : 0,
1072                                    body->oa.o_valid & OBD_MD_FLFID ?
1073                                                 body->oa.o_parent_ver : 0,
1074                                    body->oa.o_id,
1075                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1076                                                 body->oa.o_seq : (__u64)0,
1077                                    local_nb[0].offset,
1078                                    local_nb[npages-1].offset +
1079                                    local_nb[npages-1].len - 1 );
1080                 CERROR("client csum %x, original server csum %x, "
1081                        "server csum now %x\n",
1082                        client_cksum, server_cksum, new_cksum);
1083         }
1084
1085         if (rc == 0) {
1086                 int nob = 0;
1087
1088                 /* set per-requested niobuf return codes */
1089                 for (i = j = 0; i < niocount; i++) {
1090                         int len = remote_nb[i].len;
1091
1092                         nob += len;
1093                         rcs[i] = 0;
1094                         do {
1095                                 LASSERT(j < npages);
1096                                 if (local_nb[j].rc < 0)
1097                                         rcs[i] = local_nb[j].rc;
1098                                 len -= local_nb[j].len;
1099                                 j++;
1100                         } while (len > 0);
1101                         LASSERT(len == 0);
1102                 }
1103                 LASSERT(j == npages);
1104                 ptlrpc_lprocfs_brw(req, nob);
1105         }
1106
1107 out_lock:
1108         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1109 out_tls:
1110         ost_tls_put(req);
1111 out_bulk:
1112         if (desc)
1113                 ptlrpc_free_bulk(desc);
1114 out:
1115         if (rc == 0) {
1116                 oti_to_request(oti, req);
1117                 target_committed_to_req(req);
1118                 rc = ptlrpc_reply(req);
1119         } else if (!no_reply) {
1120                 /* Only reply if there was no comms problem with bulk */
1121                 target_committed_to_req(req);
1122                 req->rq_status = rc;
1123                 ptlrpc_error(req);
1124         } else {
1125                 /* reply out callback would free */
1126                 ptlrpc_req_drop_rs(req);
1127                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1128                       "client will retry\n",
1129                       exp->exp_obd->obd_name,
1130                       exp->exp_client_uuid.uuid,
1131                       exp->exp_connection->c_remote_uuid.uuid,
1132                       libcfs_id2str(req->rq_peer));
1133         }
1134         cfs_memory_pressure_clr();
1135         RETURN(rc);
1136 }
1137
1138 /**
1139  * Implementation of OST_SET_INFO.
1140  *
1141  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1142  * "key" and a value RPC buffers as arguments, with the value's contents
1143  * interpreted according to the key.
1144  *
1145  * Value types that need swabbing have swabbing done explicitly, either here or
1146  * in functions called from here.  This should be corrected: all swabbing should
1147  * be done in the capsule abstraction, as that will then allow us to move
1148  * swabbing exclusively to the client without having to modify server code
1149  * outside the capsule abstraction's implementation itself.  To correct this
1150  * will require minor changes to the capsule abstraction; see the comments for
1151  * req_capsule_extend() in layout.c.
1152  */
1153 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1154 {
1155         struct ost_body *body = NULL, *repbody;
1156         char *key, *val = NULL;
1157         int keylen, vallen, rc = 0;
1158         int is_grant_shrink = 0;
1159         ENTRY;
1160
1161         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1162         if (key == NULL) {
1163                 DEBUG_REQ(D_HA, req, "no set_info key");
1164                 RETURN(-EFAULT);
1165         }
1166         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1167                                       RCL_CLIENT);
1168
1169         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1170                                       RCL_CLIENT);
1171
1172         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1173                 /* In this case the value is actually an RMF_OST_BODY, so we
1174                  * transmutate the type of this PTLRPC */
1175                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1176
1177         rc = req_capsule_server_pack(&req->rq_pill);
1178         if (rc)
1179                 RETURN(rc);
1180
1181         if (vallen) {
1182                 if (is_grant_shrink) {
1183                         body = req_capsule_client_get(&req->rq_pill,
1184                                                       &RMF_OST_BODY);
1185                         if (!body)
1186                                 RETURN(-EFAULT);
1187
1188                         repbody = req_capsule_server_get(&req->rq_pill,
1189                                                          &RMF_OST_BODY);
1190                         memcpy(repbody, body, sizeof(*body));
1191                         val = (char*)repbody;
1192                 } else {
1193                         val = req_capsule_client_get(&req->rq_pill,
1194                                                      &RMF_SETINFO_VAL);
1195                 }
1196         }
1197
1198         if (KEY_IS(KEY_EVICT_BY_NID)) {
1199                 if (val && vallen)
1200                         obd_export_evict_by_nid(exp->exp_obd, val);
1201                 GOTO(out, rc = 0);
1202         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1203                 if (vallen < sizeof(__u32))
1204                         RETURN(-EFAULT);
1205                 __swab32s((__u32 *)val);
1206         }
1207
1208         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1209          * a struct ost_body * value */
1210         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1211 out:
1212         lustre_msg_set_status(req->rq_repmsg, 0);
1213         RETURN(rc);
1214 }
1215
1216 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1217 {
1218         void *key, *reply;
1219         int keylen, replylen, rc = 0;
1220         struct req_capsule *pill = &req->rq_pill;
1221         ENTRY;
1222
1223         /* this common part for get_info rpc */
1224         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1225         if (key == NULL) {
1226                 DEBUG_REQ(D_HA, req, "no get_info key");
1227                 RETURN(-EFAULT);
1228         }
1229         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1230
1231         if (KEY_IS(KEY_FIEMAP)) {
1232                 struct ll_fiemap_info_key *fm_key = key;
1233                 int rc;
1234
1235                 rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
1236                 if (rc)
1237                         RETURN(rc);
1238         }
1239
1240         rc = obd_get_info(exp, keylen, key, &replylen, NULL, NULL);
1241         if (rc)
1242                 RETURN(rc);
1243
1244         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1245                              RCL_SERVER, replylen);
1246
1247         rc = req_capsule_server_pack(pill);
1248         if (rc)
1249                 RETURN(rc);
1250
1251         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1252         if (reply == NULL)
1253                 RETURN(-ENOMEM);
1254
1255         /* call again to fill in the reply buffer */
1256         rc = obd_get_info(exp, keylen, key, &replylen, reply, NULL);
1257
1258         lustre_msg_set_status(req->rq_repmsg, 0);
1259         RETURN(rc);
1260 }
1261
1262 #ifdef HAVE_QUOTA_SUPPORT
1263 static int ost_handle_quotactl(struct ptlrpc_request *req)
1264 {
1265         struct obd_quotactl *oqctl, *repoqc;
1266         int rc;
1267         ENTRY;
1268
1269         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1270         if (oqctl == NULL)
1271                 GOTO(out, rc = -EPROTO);
1272
1273         rc = req_capsule_server_pack(&req->rq_pill);
1274         if (rc)
1275                 GOTO(out, rc);
1276
1277         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1278         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1279         *repoqc = *oqctl;
1280
1281 out:
1282         RETURN(rc);
1283 }
1284
1285 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1286 {
1287         struct obd_quotactl *oqctl;
1288         int rc;
1289         ENTRY;
1290
1291         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1292         if (oqctl == NULL)
1293                 RETURN(-EPROTO);
1294
1295         rc = req_capsule_server_pack(&req->rq_pill);
1296         if (rc)
1297                 RETURN(-ENOMEM);
1298
1299         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1300         RETURN(0);
1301 }
1302
1303 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1304 {
1305         struct quota_adjust_qunit *oqaq, *repoqa;
1306         struct lustre_quota_ctxt *qctxt;
1307         int rc;
1308         ENTRY;
1309
1310         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1311         oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1312         if (oqaq == NULL)
1313                 GOTO(out, rc = -EPROTO);
1314
1315         rc = req_capsule_server_pack(&req->rq_pill);
1316         if (rc)
1317                 GOTO(out, rc);
1318
1319         repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1320         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt, NULL);
1321         *repoqa = *oqaq;
1322
1323  out:
1324         RETURN(rc);
1325 }
1326 #endif
1327
1328 static int ost_llog_handle_connect(struct obd_export *exp,
1329                                    struct ptlrpc_request *req)
1330 {
1331         struct llogd_conn_body *body;
1332         int rc;
1333         ENTRY;
1334
1335         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1336         rc = obd_llog_connect(exp, body);
1337         RETURN(rc);
1338 }
1339
1340 #define ost_init_sec_none(reply, exp)                                   \
1341 do {                                                                    \
1342         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1343                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1344                                       OBD_CONNECT_OSS_CAPA);            \
1345         cfs_spin_lock(&exp->exp_lock);                                  \
1346         exp->exp_connect_flags = reply->ocd_connect_flags;              \
1347         cfs_spin_unlock(&exp->exp_lock);                                \
1348 } while (0)
1349
1350 static int ost_init_sec_level(struct ptlrpc_request *req)
1351 {
1352         struct obd_export *exp = req->rq_export;
1353         struct req_capsule *pill = &req->rq_pill;
1354         struct obd_device *obd = exp->exp_obd;
1355         struct filter_obd *filter = &obd->u.filter;
1356         char *client = libcfs_nid2str(req->rq_peer.nid);
1357         struct obd_connect_data *data, *reply;
1358         int rc = 0, remote;
1359         ENTRY;
1360
1361         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1362         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1363         if (data == NULL || reply == NULL)
1364                 RETURN(-EFAULT);
1365
1366         /* connection from MDT is always trusted */
1367         if (req->rq_auth_usr_mdt) {
1368                 ost_init_sec_none(reply, exp);
1369                 RETURN(0);
1370         }
1371
1372         /* no GSS support case */
1373         if (!req->rq_auth_gss) {
1374                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1375                         CWARN("client %s -> target %s does not user GSS, "
1376                               "can not run under security level %d.\n",
1377                               client, obd->obd_name, filter->fo_sec_level);
1378                         RETURN(-EACCES);
1379                 } else {
1380                         ost_init_sec_none(reply, exp);
1381                         RETURN(0);
1382                 }
1383         }
1384
1385         /* old version case */
1386         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1387                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1388                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1389                         CWARN("client %s -> target %s uses old version, "
1390                               "can not run under security level %d.\n",
1391                               client, obd->obd_name, filter->fo_sec_level);
1392                         RETURN(-EACCES);
1393                 } else {
1394                         CWARN("client %s -> target %s uses old version, "
1395                               "run under security level %d.\n",
1396                               client, obd->obd_name, filter->fo_sec_level);
1397                         ost_init_sec_none(reply, exp);
1398                         RETURN(0);
1399                 }
1400         }
1401
1402         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1403         if (remote) {
1404                 if (!req->rq_auth_remote)
1405                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1406                                "asked to be remote.\n", client, obd->obd_name);
1407         } else if (req->rq_auth_remote) {
1408                 remote = 1;
1409                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1410                        "as remote by default.\n", client, obd->obd_name);
1411         }
1412
1413         if (remote) {
1414                 if (!filter->fo_fl_oss_capa) {
1415                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1416                                " but OSS capabilities are not enabled: %d.\n",
1417                                client, obd->obd_name, filter->fo_fl_oss_capa);
1418                         RETURN(-EACCES);
1419                 }
1420         }
1421
1422         switch (filter->fo_sec_level) {
1423         case LUSTRE_SEC_NONE:
1424                 if (!remote) {
1425                         ost_init_sec_none(reply, exp);
1426                         break;
1427                 } else {
1428                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1429                                "can not run under security level %d.\n",
1430                                client, obd->obd_name, filter->fo_sec_level);
1431                         RETURN(-EACCES);
1432                 }
1433         case LUSTRE_SEC_REMOTE:
1434                 if (!remote)
1435                         ost_init_sec_none(reply, exp);
1436                 break;
1437         case LUSTRE_SEC_ALL:
1438                 if (!remote) {
1439                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1440                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1441                         if (!filter->fo_fl_oss_capa)
1442                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1443
1444                         cfs_spin_lock(&exp->exp_lock);
1445                         exp->exp_connect_flags = reply->ocd_connect_flags;
1446                         cfs_spin_unlock(&exp->exp_lock);
1447                 }
1448                 break;
1449         default:
1450                 RETURN(-EINVAL);
1451         }
1452
1453         RETURN(rc);
1454 }
1455
1456 /*
1457  * FIXME
1458  * this should be done in filter_connect()/filter_reconnect(), but
1459  * we can't obtain information like NID, which stored in incoming
1460  * request, thus can't decide what flavor to use. so we do it here.
1461  *
1462  * This hack should be removed after the OST stack be rewritten, just
1463  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1464  */
1465 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1466 {
1467         struct obd_export     *exp = req->rq_export;
1468         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1469         struct sptlrpc_flavor  flvr;
1470         int                    rc = 0;
1471
1472         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1473                             LUSTRE_ECHO_NAME) == 0)) {
1474                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1475                 return 0;
1476         }
1477
1478         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1479                 cfs_read_lock(&filter->fo_sptlrpc_lock);
1480                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1481                                              req->rq_sp_from,
1482                                              req->rq_peer.nid,
1483                                              &flvr);
1484                 cfs_read_unlock(&filter->fo_sptlrpc_lock);
1485
1486                 cfs_spin_lock(&exp->exp_lock);
1487
1488                 exp->exp_sp_peer = req->rq_sp_from;
1489                 exp->exp_flvr = flvr;
1490
1491                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1492                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1493                         CERROR("unauthorized rpc flavor %x from %s, "
1494                                "expect %x\n", req->rq_flvr.sf_rpc,
1495                                libcfs_nid2str(req->rq_peer.nid),
1496                                exp->exp_flvr.sf_rpc);
1497                         rc = -EACCES;
1498                 }
1499
1500                 cfs_spin_unlock(&exp->exp_lock);
1501         } else {
1502                 if (exp->exp_sp_peer != req->rq_sp_from) {
1503                         CERROR("RPC source %s doesn't match %s\n",
1504                                sptlrpc_part2name(req->rq_sp_from),
1505                                sptlrpc_part2name(exp->exp_sp_peer));
1506                         rc = -EACCES;
1507                 } else {
1508                         rc = sptlrpc_target_export_check(exp, req);
1509                 }
1510         }
1511
1512         return rc;
1513 }
1514
1515 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1516  * (if requested) */
1517 int ost_blocking_ast(struct ldlm_lock *lock,
1518                              struct ldlm_lock_desc *desc,
1519                              void *data, int flag)
1520 {
1521         __u32 sync_lock_cancel = 0;
1522         __u32 len = sizeof(sync_lock_cancel);
1523         int rc = 0;
1524         ENTRY;
1525
1526         rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1527                           KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1528
1529         if (!rc && flag == LDLM_CB_CANCELING &&
1530             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1531             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1532              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1533               lock->l_flags & LDLM_FL_CBPENDING))) {
1534                 struct obd_info *oinfo;
1535                 struct obdo *oa;
1536                 int rc;
1537
1538                 OBD_ALLOC_PTR(oinfo);
1539                 if (!oinfo)
1540                         RETURN(-ENOMEM);
1541                 OBDO_ALLOC(oa);
1542                 if (!oa) {
1543                         OBD_FREE_PTR(oinfo);
1544                         RETURN(-ENOMEM);
1545                 }
1546                 oa->o_id = lock->l_resource->lr_name.name[0];
1547                 oa->o_seq = lock->l_resource->lr_name.name[1];
1548                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
1549                 oinfo->oi_oa = oa;
1550
1551                 rc = obd_sync(lock->l_export, oinfo,
1552                               lock->l_policy_data.l_extent.start,
1553                               lock->l_policy_data.l_extent.end, NULL);
1554                 if (rc)
1555                         CERROR("Error %d syncing data on lock cancel\n", rc);
1556
1557                 OBDO_FREE(oa);
1558                 OBD_FREE_PTR(oinfo);
1559         }
1560
1561         rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1562         RETURN(rc);
1563 }
1564
1565 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1566                                        struct obd_device *obd, int *process)
1567 {
1568         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1569         case OST_CONNECT: /* This will never get here, but for completeness. */
1570         case OST_DISCONNECT:
1571                *process = 1;
1572                RETURN(0);
1573
1574         case OBD_PING:
1575         case OST_CREATE:
1576         case OST_DESTROY:
1577         case OST_PUNCH:
1578         case OST_SETATTR:
1579         case OST_SYNC:
1580         case OST_WRITE:
1581         case OBD_LOG_CANCEL:
1582         case LDLM_ENQUEUE:
1583                 *process = target_queue_recovery_request(req, obd);
1584                 RETURN(0);
1585
1586         default:
1587                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1588                 *process = -EAGAIN;
1589                 RETURN(0);
1590         }
1591 }
1592
1593 int ost_msg_check_version(struct lustre_msg *msg)
1594 {
1595         int rc;
1596
1597         switch(lustre_msg_get_opc(msg)) {
1598         case OST_CONNECT:
1599         case OST_DISCONNECT:
1600         case OBD_PING:
1601         case SEC_CTX_INIT:
1602         case SEC_CTX_INIT_CONT:
1603         case SEC_CTX_FINI:
1604                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1605                 if (rc)
1606                         CERROR("bad opc %u version %08x, expecting %08x\n",
1607                                lustre_msg_get_opc(msg),
1608                                lustre_msg_get_version(msg),
1609                                LUSTRE_OBD_VERSION);
1610                 break;
1611         case OST_CREATE:
1612         case OST_DESTROY:
1613         case OST_GETATTR:
1614         case OST_SETATTR:
1615         case OST_WRITE:
1616         case OST_READ:
1617         case OST_PUNCH:
1618         case OST_STATFS:
1619         case OST_SYNC:
1620         case OST_SET_INFO:
1621         case OST_GET_INFO:
1622 #ifdef HAVE_QUOTA_SUPPORT
1623         case OST_QUOTACHECK:
1624         case OST_QUOTACTL:
1625         case OST_QUOTA_ADJUST_QUNIT:
1626 #endif
1627                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1628                 if (rc)
1629                         CERROR("bad opc %u version %08x, expecting %08x\n",
1630                                lustre_msg_get_opc(msg),
1631                                lustre_msg_get_version(msg),
1632                                LUSTRE_OST_VERSION);
1633                 break;
1634         case LDLM_ENQUEUE:
1635         case LDLM_CONVERT:
1636         case LDLM_CANCEL:
1637         case LDLM_BL_CALLBACK:
1638         case LDLM_CP_CALLBACK:
1639                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1640                 if (rc)
1641                         CERROR("bad opc %u version %08x, expecting %08x\n",
1642                                lustre_msg_get_opc(msg),
1643                                lustre_msg_get_version(msg),
1644                                LUSTRE_DLM_VERSION);
1645                 break;
1646         case LLOG_ORIGIN_CONNECT:
1647         case OBD_LOG_CANCEL:
1648                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1649                 if (rc)
1650                         CERROR("bad opc %u version %08x, expecting %08x\n",
1651                                lustre_msg_get_opc(msg),
1652                                lustre_msg_get_version(msg),
1653                                LUSTRE_LOG_VERSION);
1654                 break;
1655         default:
1656                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1657                 rc = -ENOTSUPP;
1658         }
1659         return rc;
1660 }
1661
1662 struct ost_prolong_data {
1663         struct ptlrpc_request *opd_req;
1664         struct obd_export     *opd_exp;
1665         struct obdo           *opd_oa;
1666         struct ldlm_res_id     opd_resid;
1667         struct ldlm_extent     opd_extent;
1668         ldlm_mode_t            opd_mode;
1669         unsigned int           opd_locks;
1670         int                    opd_timeout;
1671 };
1672
1673 /* prolong locks for the current service time of the corresponding
1674  * portal (= OST_IO_PORTAL)
1675  */
1676 static inline int prolong_timeout(struct ptlrpc_request *req)
1677 {
1678         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
1679
1680         if (AT_OFF)
1681                 return obd_timeout / 2;
1682
1683         return max(at_est2timeout(at_get(&svc->srv_at_estimate)), ldlm_timeout);
1684 }
1685
1686 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
1687                                  struct ldlm_lock *lock)
1688 {
1689         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1690         LASSERT(lock->l_export == opd->opd_exp);
1691
1692         /* XXX: never try to grab resource lock here because we're inside
1693          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
1694          * res lock and then exp_bl_list_lock. */
1695
1696         if (!(lock->l_flags & LDLM_FL_AST_SENT))
1697                 /* ignore locks not being cancelled */
1698                 return;
1699
1700         LDLM_DEBUG(lock,
1701                    "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
1702                    opd->opd_req->rq_xid, opd->opd_extent.start,
1703                    opd->opd_extent.end, opd->opd_timeout);
1704
1705         /* OK. this is a possible lock the user holds doing I/O
1706          * let's refresh eviction timer for it */
1707         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
1708         ++opd->opd_locks;
1709 }
1710
1711 static void ost_prolong_locks(struct ost_prolong_data *data)
1712 {
1713         struct obd_export *exp = data->opd_exp;
1714         struct obdo       *oa  = data->opd_oa;
1715         struct ldlm_lock  *lock;
1716         ENTRY;
1717
1718         if (oa->o_valid & OBD_MD_FLHANDLE) {
1719                 /* mostly a request should be covered by only one lock, try
1720                  * fast path. */
1721                 lock = ldlm_handle2lock(&oa->o_handle);
1722                 if (lock != NULL) {
1723                         /* Fast path to check if the lock covers the whole IO
1724                          * region exclusively. */
1725                         if (lock->l_granted_mode == LCK_PW &&
1726                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
1727                                                 &data->opd_extent)) {
1728                                 /* bingo */
1729                                 ost_prolong_lock_one(data, lock);
1730                                 LDLM_LOCK_PUT(lock);
1731                                 RETURN_EXIT;
1732                         }
1733                         LDLM_LOCK_PUT(lock);
1734                 }
1735         }
1736
1737
1738         cfs_spin_lock_bh(&exp->exp_bl_list_lock);
1739         cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
1740                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1741                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
1742
1743                 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
1744                         continue;
1745
1746                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
1747                                          &data->opd_extent))
1748                         continue;
1749
1750                 ost_prolong_lock_one(data, lock);
1751         }
1752         cfs_spin_unlock_bh(&exp->exp_bl_list_lock);
1753
1754         EXIT;
1755 }
1756
1757 /**
1758  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1759  * not.
1760  */
1761 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1762                                    struct ldlm_lock *lock)
1763 {
1764         struct niobuf_remote *nb;
1765         struct obd_ioobj *ioo;
1766         int mode, opc;
1767         struct ldlm_extent ext;
1768         ENTRY;
1769
1770         opc = lustre_msg_get_opc(req->rq_reqmsg);
1771         LASSERT(opc == OST_READ || opc == OST_WRITE);
1772
1773         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1774         LASSERT(ioo != NULL);
1775
1776         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1777         LASSERT(nb != NULL);
1778
1779         ext.start = nb->offset;
1780         nb += ioo->ioo_bufcnt - 1;
1781         ext.end = nb->offset + nb->len - 1;
1782
1783         LASSERT(lock->l_resource != NULL);
1784         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
1785                              &lock->l_resource->lr_name))
1786                 RETURN(0);
1787
1788         mode = LCK_PW;
1789         if (opc == OST_READ)
1790                 mode |= LCK_PR;
1791         if (!(lock->l_granted_mode & mode))
1792                 RETURN(0);
1793
1794         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
1795 }
1796
1797 /**
1798  * High-priority queue request check for whether the given PTLRPC request (\a
1799  * req) is blocking an LDLM lock cancel.
1800  *
1801  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1802  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1803  *
1804  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1805  * function looks only at OST_READs and OST_WRITEs.
1806  */
1807 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1808 {
1809         struct obd_device *obd = req->rq_export->exp_obd;
1810         struct ost_body *body;
1811         struct obd_ioobj *ioo;
1812         struct niobuf_remote *nb;
1813         struct ost_prolong_data opd = { 0 };
1814         int mode, opc;
1815         ENTRY;
1816
1817         /*
1818          * Use LASSERT to do sanity check because malformed RPCs should have
1819          * been filtered out in ost_hpreq_handler().
1820          */
1821         opc = lustre_msg_get_opc(req->rq_reqmsg);
1822         LASSERT(opc == OST_READ || opc == OST_WRITE);
1823
1824         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1825         LASSERT(body != NULL);
1826
1827         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1828         LASSERT(ioo != NULL);
1829
1830         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1831         LASSERT(nb != NULL);
1832         LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
1833
1834         osc_build_res_name(ioo->ioo_id, ioo->ioo_seq, &opd.opd_resid);
1835
1836         opd.opd_req = req;
1837         mode = LCK_PW;
1838         if (opc == OST_READ)
1839                 mode |= LCK_PR;
1840         opd.opd_mode = mode;
1841         opd.opd_exp = req->rq_export;
1842         opd.opd_oa  = &body->oa;
1843         opd.opd_extent.start = nb->offset;
1844         nb += ioo->ioo_bufcnt - 1;
1845         opd.opd_extent.end = nb->offset + nb->len - 1;
1846         opd.opd_timeout = prolong_timeout(req);
1847
1848         DEBUG_REQ(D_RPCTRACE, req,
1849                "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1850                obd->obd_name, cfs_current()->comm,
1851                opd.opd_resid.name[0], opd.opd_resid.name[1],
1852                opd.opd_extent.start, opd.opd_extent.end);
1853
1854         ost_prolong_locks(&opd);
1855
1856         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1857                obd->obd_name, opd.opd_locks, req);
1858
1859         RETURN(opd.opd_locks);
1860 }
1861
1862 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
1863 {
1864         (void)ost_rw_hpreq_check(req);
1865 }
1866
1867 /**
1868  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1869  */
1870 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1871                                       struct ldlm_lock *lock)
1872 {
1873         struct ost_body *body;
1874         ENTRY;
1875
1876         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1877         LASSERT(body != NULL);
1878
1879         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1880             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1881                 RETURN(1);
1882
1883         RETURN(0);
1884 }
1885
1886 /**
1887  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1888  */
1889 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1890 {
1891         struct obd_device *obd = req->rq_export->exp_obd;
1892         struct ost_body *body;
1893         struct obdo *oa;
1894         struct ost_prolong_data opd = { 0 };
1895         __u64 start, end;
1896         ENTRY;
1897
1898         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1899         LASSERT(body != NULL);
1900
1901         oa = &body->oa;
1902         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
1903                 !(oa->o_flags & OBD_FL_SRVLOCK));
1904
1905         start = oa->o_size;
1906         end = start + oa->o_blocks;
1907
1908         opd.opd_req = req;
1909         opd.opd_mode = LCK_PW;
1910         opd.opd_exp = req->rq_export;
1911         opd.opd_oa  = oa;
1912         opd.opd_extent.start = start;
1913         opd.opd_extent.end   = end;
1914         if (oa->o_blocks == OBD_OBJECT_EOF)
1915                 opd.opd_extent.end = OBD_OBJECT_EOF;
1916         opd.opd_timeout = prolong_timeout(req);
1917
1918         osc_build_res_name(oa->o_id, oa->o_seq, &opd.opd_resid);
1919
1920         CDEBUG(D_DLMTRACE,
1921                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1922                obd->obd_name,
1923                opd.opd_resid.name[0], opd.opd_resid.name[1],
1924                opd.opd_extent.start, opd.opd_extent.end);
1925
1926         ost_prolong_locks(&opd);
1927
1928         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1929                obd->obd_name, opd.opd_locks, req);
1930
1931         RETURN(opd.opd_locks > 0);
1932 }
1933
1934 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
1935 {
1936         (void)ost_punch_hpreq_check(req);
1937 }
1938
1939 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1940         .hpreq_lock_match = ost_rw_hpreq_lock_match,
1941         .hpreq_check      = ost_rw_hpreq_check,
1942         .hpreq_fini       = ost_rw_hpreq_fini
1943 };
1944
1945 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1946         .hpreq_lock_match = ost_punch_hpreq_lock_match,
1947         .hpreq_check      = ost_punch_hpreq_check,
1948         .hpreq_fini       = ost_punch_hpreq_fini
1949 };
1950
1951 /** Assign high priority operations to the request if needed. */
1952 static int ost_hpreq_handler(struct ptlrpc_request *req)
1953 {
1954         ENTRY;
1955         if (req->rq_export) {
1956                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1957                 struct ost_body *body;
1958
1959                 if (opc == OST_READ || opc == OST_WRITE) {
1960                         struct niobuf_remote *nb;
1961                         struct obd_ioobj *ioo;
1962                         int objcount, niocount;
1963                         int rc;
1964                         int i;
1965
1966                         /* RPCs on the H-P queue can be inspected before
1967                          * ost_handler() initializes their pills, so we
1968                          * initialize that here.  Capsule initialization is
1969                          * idempotent, as is setting the pill's format (provided
1970                          * it doesn't change).
1971                          */
1972                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1973                         if (opc == OST_READ)
1974                                 req_capsule_set(&req->rq_pill,
1975                                                 &RQF_OST_BRW_READ);
1976                         else
1977                                 req_capsule_set(&req->rq_pill,
1978                                                 &RQF_OST_BRW_WRITE);
1979
1980                         body = req_capsule_client_get(&req->rq_pill,
1981                                                       &RMF_OST_BODY);
1982                         if (body == NULL) {
1983                                 CERROR("Missing/short ost_body\n");
1984                                 RETURN(-EFAULT);
1985                         }
1986
1987                         objcount = req_capsule_get_size(&req->rq_pill,
1988                                                         &RMF_OBD_IOOBJ,
1989                                                         RCL_CLIENT) /
1990                                                         sizeof(*ioo);
1991                         if (objcount == 0) {
1992                                 CERROR("Missing/short ioobj\n");
1993                                 RETURN(-EFAULT);
1994                         }
1995                         if (objcount > 1) {
1996                                 CERROR("too many ioobjs (%d)\n", objcount);
1997                                 RETURN(-EFAULT);
1998                         }
1999
2000                         ioo = req_capsule_client_get(&req->rq_pill,
2001                                                      &RMF_OBD_IOOBJ);
2002                         if (ioo == NULL) {
2003                                 CERROR("Missing/short ioobj\n");
2004                                 RETURN(-EFAULT);
2005                         }
2006
2007                         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
2008                         if (rc) {
2009                                 CERROR("invalid object ids\n");
2010                                 RETURN(rc);
2011                         }
2012
2013                         for (niocount = i = 0; i < objcount; i++) {
2014                                 if (ioo[i].ioo_bufcnt == 0) {
2015                                         CERROR("ioo[%d] has zero bufcnt\n", i);
2016                                         RETURN(-EFAULT);
2017                                 }
2018                                 niocount += ioo[i].ioo_bufcnt;
2019                         }
2020                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
2021                                 DEBUG_REQ(D_RPCTRACE, req,
2022                                           "bulk has too many pages (%d)",
2023                                           niocount);
2024                                 RETURN(-EFAULT);
2025                         }
2026
2027                         nb = req_capsule_client_get(&req->rq_pill,
2028                                                     &RMF_NIOBUF_REMOTE);
2029                         if (nb == NULL) {
2030                                 CERROR("Missing/short niobuf\n");
2031                                 RETURN(-EFAULT);
2032                         }
2033
2034                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
2035                                 req->rq_ops = &ost_hpreq_rw;
2036                 } else if (opc == OST_PUNCH) {
2037                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2038                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2039
2040                         body = req_capsule_client_get(&req->rq_pill,
2041                                                       &RMF_OST_BODY);
2042                         if (body == NULL) {
2043                                 CERROR("Missing/short ost_body\n");
2044                                 RETURN(-EFAULT);
2045                         }
2046
2047                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
2048                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
2049                                 req->rq_ops = &ost_hpreq_punch;
2050                 }
2051         }
2052         RETURN(0);
2053 }
2054
2055 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2056 int ost_handle(struct ptlrpc_request *req)
2057 {
2058         struct obd_trans_info trans_info = { 0, };
2059         struct obd_trans_info *oti = &trans_info;
2060         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
2061         struct obd_device *obd = NULL;
2062         ENTRY;
2063
2064         LASSERT(current->journal_info == NULL);
2065
2066         /* primordial rpcs don't affect server recovery */
2067         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2068         case SEC_CTX_INIT:
2069         case SEC_CTX_INIT_CONT:
2070         case SEC_CTX_FINI:
2071                 GOTO(out, rc = 0);
2072         }
2073
2074         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2075
2076         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
2077                 if (!class_connected_export(req->rq_export)) {
2078                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2079                                lustre_msg_get_opc(req->rq_reqmsg),
2080                                libcfs_id2str(req->rq_peer));
2081                         req->rq_status = -ENOTCONN;
2082                         GOTO(out, rc = -ENOTCONN);
2083                 }
2084
2085                 obd = req->rq_export->exp_obd;
2086
2087                 /* Check for aborted recovery. */
2088                 if (obd->obd_recovering) {
2089                         rc = ost_filter_recovery_request(req, obd,
2090                                                          &should_process);
2091                         if (rc || !should_process)
2092                                 RETURN(rc);
2093                         else if (should_process < 0) {
2094                                 req->rq_status = should_process;
2095                                 rc = ptlrpc_error(req);
2096                                 RETURN(rc);
2097                         }
2098                 }
2099         }
2100
2101         oti_init(oti, req);
2102
2103         rc = ost_msg_check_version(req->rq_reqmsg);
2104         if (rc)
2105                 RETURN(rc);
2106
2107         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2108         case OST_CONNECT: {
2109                 CDEBUG(D_INODE, "connect\n");
2110                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2111                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2112                         RETURN(0);
2113                 rc = target_handle_connect(req);
2114                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2115                         RETURN(0);
2116                 if (!rc) {
2117                         rc = ost_init_sec_level(req);
2118                         if (!rc)
2119                                 rc = ost_connect_check_sptlrpc(req);
2120                 }
2121                 break;
2122         }
2123         case OST_DISCONNECT:
2124                 CDEBUG(D_INODE, "disconnect\n");
2125                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2126                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2127                         RETURN(0);
2128                 rc = target_handle_disconnect(req);
2129                 break;
2130         case OST_CREATE:
2131                 CDEBUG(D_INODE, "create\n");
2132                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2133                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2134                         RETURN(0);
2135                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2136                         GOTO(out, rc = -EROFS);
2137                 rc = ost_create(req->rq_export, req, oti);
2138                 break;
2139         case OST_DESTROY:
2140                 CDEBUG(D_INODE, "destroy\n");
2141                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2142                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2143                         RETURN(0);
2144                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2145                         GOTO(out, rc = -EROFS);
2146                 rc = ost_destroy(req->rq_export, req, oti);
2147                 break;
2148         case OST_GETATTR:
2149                 CDEBUG(D_INODE, "getattr\n");
2150                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2151                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2152                         RETURN(0);
2153                 rc = ost_getattr(req->rq_export, req);
2154                 break;
2155         case OST_SETATTR:
2156                 CDEBUG(D_INODE, "setattr\n");
2157                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2158                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2159                         RETURN(0);
2160                 rc = ost_setattr(req->rq_export, req, oti);
2161                 break;
2162         case OST_WRITE:
2163                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
2164                 CDEBUG(D_INODE, "write\n");
2165                 /* req->rq_request_portal would be nice, if it was set */
2166                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2167                         CERROR("%s: deny write request from %s to portal %u\n",
2168                                req->rq_export->exp_obd->obd_name,
2169                                obd_export_nid2str(req->rq_export),
2170                                req->rq_rqbd->rqbd_service->srv_req_portal);
2171                         GOTO(out, rc = -EPROTO);
2172                 }
2173                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2174                         RETURN(0);
2175                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2176                         GOTO(out, rc = -ENOSPC);
2177                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2178                         GOTO(out, rc = -EROFS);
2179                 rc = ost_brw_write(req, oti);
2180                 LASSERT(current->journal_info == NULL);
2181                 /* ost_brw_write sends its own replies */
2182                 RETURN(rc);
2183         case OST_READ:
2184                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
2185                 CDEBUG(D_INODE, "read\n");
2186                 /* req->rq_request_portal would be nice, if it was set */
2187                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2188                         CERROR("%s: deny read request from %s to portal %u\n",
2189                                req->rq_export->exp_obd->obd_name,
2190                                obd_export_nid2str(req->rq_export),
2191                                req->rq_rqbd->rqbd_service->srv_req_portal);
2192                         GOTO(out, rc = -EPROTO);
2193                 }
2194                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2195                         RETURN(0);
2196                 rc = ost_brw_read(req, oti);
2197                 LASSERT(current->journal_info == NULL);
2198                 /* ost_brw_read sends its own replies */
2199                 RETURN(rc);
2200         case OST_PUNCH:
2201                 CDEBUG(D_INODE, "punch\n");
2202                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2203                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2204                         RETURN(0);
2205                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2206                         GOTO(out, rc = -EROFS);
2207                 rc = ost_punch(req->rq_export, req, oti);
2208                 break;
2209         case OST_STATFS:
2210                 CDEBUG(D_INODE, "statfs\n");
2211                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2212                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2213                         RETURN(0);
2214                 rc = ost_statfs(req);
2215                 break;
2216         case OST_SYNC:
2217                 CDEBUG(D_INODE, "sync\n");
2218                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2219                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2220                         RETURN(0);
2221                 rc = ost_sync(req->rq_export, req);
2222                 break;
2223         case OST_SET_INFO:
2224                 DEBUG_REQ(D_INODE, req, "set_info");
2225                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2226                 rc = ost_set_info(req->rq_export, req);
2227                 break;
2228         case OST_GET_INFO:
2229                 DEBUG_REQ(D_INODE, req, "get_info");
2230                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2231                 rc = ost_get_info(req->rq_export, req);
2232                 break;
2233 #ifdef HAVE_QUOTA_SUPPORT
2234         case OST_QUOTACHECK:
2235                 CDEBUG(D_INODE, "quotacheck\n");
2236                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2237                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2238                         RETURN(0);
2239                 rc = ost_handle_quotacheck(req);
2240                 break;
2241         case OST_QUOTACTL:
2242                 CDEBUG(D_INODE, "quotactl\n");
2243                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2244                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2245                         RETURN(0);
2246                 rc = ost_handle_quotactl(req);
2247                 break;
2248         case OST_QUOTA_ADJUST_QUNIT:
2249                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
2250                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
2251                 rc = ost_handle_quota_adjust_qunit(req);
2252                 break;
2253 #endif
2254         case OBD_PING:
2255                 DEBUG_REQ(D_INODE, req, "ping");
2256                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2257                 rc = target_handle_ping(req);
2258                 break;
2259         /* FIXME - just reply status */
2260         case LLOG_ORIGIN_CONNECT:
2261                 DEBUG_REQ(D_INODE, req, "log connect");
2262                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2263                 rc = ost_llog_handle_connect(req->rq_export, req);
2264                 req->rq_status = rc;
2265                 rc = req_capsule_server_pack(&req->rq_pill);
2266                 if (rc)
2267                         RETURN(rc);
2268                 RETURN(ptlrpc_reply(req));
2269         case OBD_LOG_CANCEL:
2270                 CDEBUG(D_INODE, "log cancel\n");
2271                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2272                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2273                         RETURN(0);
2274                 rc = llog_origin_handle_cancel(req);
2275                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2276                         RETURN(0);
2277                 req->rq_status = rc;
2278                 rc = req_capsule_server_pack(&req->rq_pill);
2279                 if (rc)
2280                         RETURN(rc);
2281                 RETURN(ptlrpc_reply(req));
2282         case LDLM_ENQUEUE:
2283                 CDEBUG(D_INODE, "enqueue\n");
2284                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2285                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
2286                         RETURN(0);
2287                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2288                                          ost_blocking_ast,
2289                                          ldlm_server_glimpse_ast);
2290                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2291                 break;
2292         case LDLM_CONVERT:
2293                 CDEBUG(D_INODE, "convert\n");
2294                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2295                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
2296                         RETURN(0);
2297                 rc = ldlm_handle_convert(req);
2298                 break;
2299         case LDLM_CANCEL:
2300                 CDEBUG(D_INODE, "cancel\n");
2301                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2302                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2303                         RETURN(0);
2304                 rc = ldlm_handle_cancel(req);
2305                 break;
2306         case LDLM_BL_CALLBACK:
2307         case LDLM_CP_CALLBACK:
2308                 CDEBUG(D_INODE, "callback\n");
2309                 CERROR("callbacks should not happen on OST\n");
2310                 /* fall through */
2311         default:
2312                 CERROR("Unexpected opcode %d\n",
2313                        lustre_msg_get_opc(req->rq_reqmsg));
2314                 req->rq_status = -ENOTSUPP;
2315                 rc = ptlrpc_error(req);
2316                 RETURN(rc);
2317         }
2318
2319         LASSERT(current->journal_info == NULL);
2320
2321         EXIT;
2322         /* If we're DISCONNECTing, the export_data is already freed */
2323         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2324                 target_committed_to_req(req);
2325
2326 out:
2327         if (!rc)
2328                 oti_to_request(oti, req);
2329
2330         target_send_reply(req, rc, fail);
2331         return 0;
2332 }
2333 EXPORT_SYMBOL(ost_handle);
2334 /*
2335  * free per-thread pool created by ost_thread_init().
2336  */
2337 static void ost_thread_done(struct ptlrpc_thread *thread)
2338 {
2339         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2340                                              * Storage */
2341
2342         ENTRY;
2343
2344         LASSERT(thread != NULL);
2345
2346         /*
2347          * be prepared to handle partially-initialized pools (because this is
2348          * called from ost_thread_init() for cleanup.
2349          */
2350         tls = thread->t_data;
2351         if (tls != NULL) {
2352                 OBD_FREE_PTR(tls);
2353                 thread->t_data = NULL;
2354         }
2355         EXIT;
2356 }
2357
2358 /*
2359  * initialize per-thread page pool (bug 5137).
2360  */
2361 static int ost_thread_init(struct ptlrpc_thread *thread)
2362 {
2363         struct ost_thread_local_cache *tls;
2364
2365         ENTRY;
2366
2367         LASSERT(thread != NULL);
2368         LASSERT(thread->t_data == NULL);
2369         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2370
2371         OBD_ALLOC_PTR(tls);
2372         if (tls == NULL)
2373                 RETURN(-ENOMEM);
2374         thread->t_data = tls;
2375         RETURN(0);
2376 }
2377
2378 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2379
2380 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2381 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2382 {
2383         struct ost_obd *ost = &obd->u.ost;
2384         struct lprocfs_static_vars lvars;
2385         int oss_min_threads;
2386         int oss_max_threads;
2387         int oss_min_create_threads;
2388         int oss_max_create_threads;
2389         int rc;
2390         ENTRY;
2391
2392         rc = cfs_cleanup_group_info();
2393         if (rc)
2394                 RETURN(rc);
2395
2396         lprocfs_ost_init_vars(&lvars);
2397         lprocfs_obd_setup(obd, lvars.obd_vars);
2398
2399         cfs_mutex_init(&ost->ost_health_mutex);
2400
2401         if (oss_num_threads) {
2402                 /* If oss_num_threads is set, it is the min and the max. */
2403                 if (oss_num_threads > OSS_THREADS_MAX)
2404                         oss_num_threads = OSS_THREADS_MAX;
2405                 if (oss_num_threads < OSS_THREADS_MIN)
2406                         oss_num_threads = OSS_THREADS_MIN;
2407                 oss_max_threads = oss_min_threads = oss_num_threads;
2408         } else {
2409                 /* Base min threads on memory and cpus */
2410                 oss_min_threads =
2411                         cfs_num_possible_cpus() * CFS_NUM_CACHEPAGES >>
2412                         (27 - CFS_PAGE_SHIFT);
2413                 if (oss_min_threads < OSS_THREADS_MIN)
2414                         oss_min_threads = OSS_THREADS_MIN;
2415                 /* Insure a 4x range for dynamic threads */
2416                 if (oss_min_threads > OSS_THREADS_MAX / 4)
2417                         oss_min_threads = OSS_THREADS_MAX / 4;
2418                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2419         }
2420
2421         ost->ost_service =
2422                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2423                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
2424                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2425                                 ost_handle, LUSTRE_OSS_NAME,
2426                                 obd->obd_proc_entry, target_print_req,
2427                                 oss_min_threads, oss_max_threads,
2428                                 "ll_ost", LCT_DT_THREAD, NULL);
2429         if (ost->ost_service == NULL) {
2430                 CERROR("failed to start service\n");
2431                 GOTO(out_lprocfs, rc = -ENOMEM);
2432         }
2433
2434         rc = ptlrpc_start_threads(ost->ost_service);
2435         if (rc)
2436                 GOTO(out_service, rc = -EINVAL);
2437
2438         if (oss_num_create_threads) {
2439                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2440                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2441                 if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
2442                         oss_num_create_threads = OSS_MIN_CREATE_THREADS;
2443                 oss_min_create_threads = oss_max_create_threads =
2444                         oss_num_create_threads;
2445         } else {
2446                 oss_min_create_threads = OSS_MIN_CREATE_THREADS;
2447                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2448         }
2449
2450         ost->ost_create_service =
2451                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2452                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2453                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2454                                 ost_handle, "ost_create",
2455                                 obd->obd_proc_entry, target_print_req,
2456                                 oss_min_create_threads, oss_max_create_threads,
2457                                 "ll_ost_creat", LCT_DT_THREAD, NULL);
2458         if (ost->ost_create_service == NULL) {
2459                 CERROR("failed to start OST create service\n");
2460                 GOTO(out_service, rc = -ENOMEM);
2461         }
2462
2463         rc = ptlrpc_start_threads(ost->ost_create_service);
2464         if (rc)
2465                 GOTO(out_create, rc = -EINVAL);
2466
2467         ost->ost_io_service =
2468                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2469                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2470                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2471                                 ost_handle, "ost_io",
2472                                 obd->obd_proc_entry, target_print_req,
2473                                 oss_min_threads, oss_max_threads,
2474                                 "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
2475         if (ost->ost_io_service == NULL) {
2476                 CERROR("failed to start OST I/O service\n");
2477                 GOTO(out_create, rc = -ENOMEM);
2478         }
2479
2480         ost->ost_io_service->srv_init = ost_thread_init;
2481         ost->ost_io_service->srv_done = ost_thread_done;
2482         ost->ost_io_service->srv_cpu_affinity = 1;
2483         rc = ptlrpc_start_threads(ost->ost_io_service);
2484         if (rc)
2485                 GOTO(out_io, rc = -EINVAL);
2486
2487         ping_evictor_start();
2488
2489         RETURN(0);
2490
2491 out_io:
2492         ptlrpc_unregister_service(ost->ost_io_service);
2493         ost->ost_io_service = NULL;
2494 out_create:
2495         ptlrpc_unregister_service(ost->ost_create_service);
2496         ost->ost_create_service = NULL;
2497 out_service:
2498         ptlrpc_unregister_service(ost->ost_service);
2499         ost->ost_service = NULL;
2500 out_lprocfs:
2501         lprocfs_obd_cleanup(obd);
2502         RETURN(rc);
2503 }
2504
2505 static int ost_cleanup(struct obd_device *obd)
2506 {
2507         struct ost_obd *ost = &obd->u.ost;
2508         int err = 0;
2509         ENTRY;
2510
2511         ping_evictor_stop();
2512
2513         /* there is no recovery for OST OBD, all recovery is controlled by
2514          * obdfilter OBD */
2515         LASSERT(obd->obd_recovering == 0);
2516         cfs_mutex_lock(&ost->ost_health_mutex);
2517         ptlrpc_unregister_service(ost->ost_service);
2518         ptlrpc_unregister_service(ost->ost_create_service);
2519         ptlrpc_unregister_service(ost->ost_io_service);
2520         ost->ost_service = NULL;
2521         ost->ost_create_service = NULL;
2522         cfs_mutex_unlock(&ost->ost_health_mutex);
2523
2524         lprocfs_obd_cleanup(obd);
2525
2526         RETURN(err);
2527 }
2528
2529 static int ost_health_check(struct obd_device *obd)
2530 {
2531         struct ost_obd *ost = &obd->u.ost;
2532         int rc = 0;
2533
2534         cfs_mutex_lock(&ost->ost_health_mutex);
2535         rc |= ptlrpc_service_health_check(ost->ost_service);
2536         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2537         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2538         cfs_mutex_unlock(&ost->ost_health_mutex);
2539
2540         /*
2541          * health_check to return 0 on healthy
2542          * and 1 on unhealthy.
2543          */
2544         if( rc != 0)
2545                 rc = 1;
2546
2547         return rc;
2548 }
2549
2550 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2551 {
2552         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2553 }
2554
2555 /* use obd ops to offer management infrastructure */
2556 static struct obd_ops ost_obd_ops = {
2557         .o_owner        = THIS_MODULE,
2558         .o_setup        = ost_setup,
2559         .o_cleanup      = ost_cleanup,
2560         .o_health_check = ost_health_check,
2561 };
2562
2563
2564 static int __init ost_init(void)
2565 {
2566         struct lprocfs_static_vars lvars;
2567         int rc;
2568         ENTRY;
2569
2570         lprocfs_ost_init_vars(&lvars);
2571         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2572                                  LUSTRE_OSS_NAME, NULL);
2573
2574         if (ost_num_threads != 0 && oss_num_threads == 0) {
2575                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2576                               "use oss_num_threads instead or unset both for "
2577                               "dynamic thread startup\n");
2578                 oss_num_threads = ost_num_threads;
2579         }
2580
2581         RETURN(rc);
2582 }
2583
2584 static void /*__exit*/ ost_exit(void)
2585 {
2586         class_unregister_type(LUSTRE_OSS_NAME);
2587 }
2588
2589 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2590 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2591 MODULE_LICENSE("GPL");
2592
2593 module_init(ost_init);
2594 module_exit(ost_exit);