Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45 #define DEBUG_SUBSYSTEM S_OST
46
47 #include <linux/module.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <lustre_net.h>
51 #include <lustre_dlm.h>
52 #include <lustre_export.h>
53 #include <lustre_debug.h>
54 #include <linux/init.h>
55 #include <lprocfs_status.h>
56 #include <libcfs/list.h>
57 #include <lustre_quota.h>
58 #include "ost_internal.h"
59
60 static int oss_num_threads;
61 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
62                 "number of OSS service threads to start");
63
64 static int ost_num_threads;
65 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
66                 "number of OST service threads to start (deprecated)");
67
68 static int oss_num_create_threads;
69 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
70                 "number of OSS create threads to start");
71
72 /**
73  * Do not return server-side uid/gid to remote client
74  */
75 static void ost_drop_id(struct obd_export *exp, struct  obdo *oa)
76 {
77         if (exp_connect_rmtclient(exp)) {
78                 oa->o_uid = -1;
79                 oa->o_gid = -1;
80                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
81         }
82 }
83
84 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
85 {
86         struct oti_req_ack_lock *ack_lock;
87         int i;
88
89         if (oti == NULL)
90                 return;
91
92         if (req->rq_repmsg)
93                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
94         req->rq_transno = oti->oti_transno;
95
96         /* XXX 4 == entries in oti_ack_locks??? */
97         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
98                 if (!ack_lock->mode)
99                         break;
100                 /* XXX not even calling target_send_reply in some cases... */
101                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
102         }
103 }
104
105 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
106                        struct obd_trans_info *oti)
107 {
108         struct ost_body *body, *repbody;
109         struct lustre_capa *capa = NULL;
110         int rc;
111         ENTRY;
112
113         /* Get the request body */
114         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115         if (body == NULL)
116                 RETURN(-EFAULT);
117
118         if (body->oa.o_id == 0)
119                 RETURN(-EPROTO);
120
121         /* If there's a DLM request, cancel the locks mentioned in it*/
122         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
123                 struct ldlm_request *dlm;
124
125                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
126                 if (dlm == NULL)
127                         RETURN (-EFAULT);
128                 ldlm_request_cancel(req, dlm, 0);
129         }
130
131         /* If there's a capability, get it */
132         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
133                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
134                 if (capa == NULL) {
135                         CERROR("Missing capability for OST DESTROY");
136                         RETURN (-EFAULT);
137                 }
138         }
139
140         /* Prepare the reply */
141         rc = req_capsule_server_pack(&req->rq_pill);
142         if (rc)
143                 RETURN(rc);
144
145         /* Get the log cancellation cookie */
146         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
147                 oti->oti_logcookies = &body->oa.o_lcookie;
148
149         /* Finish the reply */
150         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
151         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
152
153         /* Do the destroy and set the reply status accordingly  */
154         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
155         RETURN(0);
156 }
157
158 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
159 {
160         struct ost_body *body, *repbody;
161         struct obd_info oinfo = { { { 0 } } };
162         int rc;
163         ENTRY;
164
165         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
166         if (body == NULL)
167                 RETURN(-EFAULT);
168
169         rc = req_capsule_server_pack(&req->rq_pill);
170         if (rc)
171                 RETURN(rc);
172
173         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
174         repbody->oa = body->oa;
175
176         oinfo.oi_oa = &repbody->oa;
177         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
178                 oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
179                                                        &RMF_CAPA1);
180                 if (oinfo.oi_capa == NULL) {
181                         CERROR("Missing capability for OST GETATTR");
182                         RETURN (-EFAULT);
183                 }
184         }
185
186         req->rq_status = obd_getattr(exp, &oinfo);
187         ost_drop_id(exp, &repbody->oa);
188         RETURN(0);
189 }
190
191 static int ost_statfs(struct ptlrpc_request *req)
192 {
193         struct obd_statfs *osfs;
194         int rc;
195         ENTRY;
196
197         rc = req_capsule_server_pack(&req->rq_pill);
198         if (rc)
199                 RETURN(rc);
200
201         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
202
203         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
204                                     cfs_time_current_64() - HZ, 0);
205         if (req->rq_status != 0)
206                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
207
208         RETURN(0);
209 }
210
211 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
212                       struct obd_trans_info *oti)
213 {
214         struct ost_body *body, *repbody;
215         int rc;
216         ENTRY;
217
218         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
219         if (body == NULL)
220                 RETURN(-EFAULT);
221
222         rc = req_capsule_server_pack(&req->rq_pill);
223         if (rc)
224                 RETURN(rc);
225
226         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
227         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
228         oti->oti_logcookies = &repbody->oa.o_lcookie;
229
230         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
231         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
232         RETURN(0);
233 }
234
235 /**
236  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
237  * lock on the file being truncated.
238  */
239 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
240                               struct lustre_handle *lh)
241 {
242         int flags;
243         struct ldlm_res_id res_id;
244         ldlm_policy_data_t policy;
245         __u64 start;
246         __u64 finis;
247
248         ENTRY;
249
250         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
251         LASSERT(!lustre_handle_is_used(lh));
252
253         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
254             !(oa->o_flags & OBD_FL_TRUNCLOCK))
255                 RETURN(0);
256
257         CDEBUG(D_INODE, "OST-side truncate lock.\n");
258
259         start = oa->o_size;
260         finis = start + oa->o_blocks;
261
262         /*
263          * standard truncate optimization: if file body is completely
264          * destroyed, don't send data back to the server.
265          */
266         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
267
268         policy.l_extent.start = start & CFS_PAGE_MASK;
269
270         /*
271          * If ->o_blocks is EOF it means "lock till the end of the
272          * file". Otherwise, it's size of a hole being punched (in bytes)
273          */
274         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
275                 policy.l_extent.end = OBD_OBJECT_EOF;
276         else
277                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
278
279         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
280                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
281                                       ldlm_blocking_ast, ldlm_completion_ast,
282                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
283 }
284
285 /**
286  * Helper function for ost_punch(): release lock acquired by
287  * ost_punch_lock_get(), if any.
288  */
289 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
290                                struct lustre_handle *lh)
291 {
292         ENTRY;
293         if (lustre_handle_is_used(lh))
294                 ldlm_lock_decref(lh, LCK_PW);
295         EXIT;
296 }
297
298 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
299                      struct obd_trans_info *oti)
300 {
301         struct obd_info oinfo = { { { 0 } } };
302         struct ost_body *body, *repbody;
303         int rc;
304         struct lustre_handle lh = {0,};
305         ENTRY;
306
307         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
308         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
309
310         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
311         if (body == NULL)
312                 RETURN(-EFAULT);
313
314         oinfo.oi_oa = &body->oa;
315         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
316         oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
317
318         if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
319             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
320                 RETURN(-EPROTO);
321
322         rc = req_capsule_server_pack(&req->rq_pill);
323         if (rc)
324                 RETURN(rc);
325
326         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
327         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
328         if (rc == 0) {
329                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
330                     oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
331                         /*
332                          * If OBD_FL_TRUNCLOCK is the only bit set in
333                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
334                          * through filter_setattr() to filter_iocontrol().
335                          */
336                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
337
338                 if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
339                         oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
340                                                                &RMF_CAPA1);
341                         if (oinfo.oi_capa == NULL) {
342                                 CERROR("Missing capability for OST PUNCH");
343                                 RETURN (-EFAULT);
344                         }
345                 }
346                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
347                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
348         }
349         repbody->oa = *oinfo.oi_oa;
350         ost_drop_id(exp, &repbody->oa);
351         RETURN(rc);
352 }
353
354 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
355 {
356         struct ost_body *body, *repbody;
357         struct lustre_capa *capa = NULL;
358         int rc;
359         ENTRY;
360
361         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 RETURN(-EFAULT);
364
365         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
366                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
367                 if (capa == NULL) {
368                         CERROR("Missing capability for OST SYNC");
369                         RETURN (-EFAULT);
370                 }
371         }
372
373         rc = req_capsule_server_pack(&req->rq_pill);
374         if (rc)
375                 RETURN(rc);
376
377         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
378         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
379         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
380                                   repbody->oa.o_blocks, capa);
381         ost_drop_id(exp, &repbody->oa);
382         RETURN(0);
383 }
384
385 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
386                        struct obd_trans_info *oti)
387 {
388         struct ost_body *body, *repbody;
389         int rc;
390         struct obd_info oinfo = { { { 0 } } };
391         ENTRY;
392
393         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
394         if (body == NULL)
395                 RETURN(-EFAULT);
396
397         rc = req_capsule_server_pack(&req->rq_pill);
398         if (rc)
399                 RETURN(rc);
400
401         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
402         repbody->oa = body->oa;
403
404         oinfo.oi_oa = &repbody->oa;
405         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
406                 oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
407                                                        &RMF_CAPA1);
408                 if (oinfo.oi_capa == NULL) {
409                         CERROR("Missing capability for OST SETATTR");
410                         RETURN (-EFAULT);
411                 }
412         }
413         req->rq_status = obd_setattr(exp, &oinfo, oti);
414         ost_drop_id(exp, &repbody->oa);
415         RETURN(0);
416 }
417
418 static int ost_bulk_timeout(void *data)
419 {
420         ENTRY;
421         /* We don't fail the connection here, because having the export
422          * killed makes the (vital) call to commitrw very sad.
423          */
424         RETURN(1);
425 }
426
427 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
428                                cksum_type_t cksum_type)
429 {
430         __u32 cksum;
431         int i;
432
433         cksum = init_checksum(cksum_type);
434         for (i = 0; i < desc->bd_iov_count; i++) {
435                 struct page *page = desc->bd_iov[i].kiov_page;
436                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
437                 char *ptr = kmap(page) + off;
438                 int len = desc->bd_iov[i].kiov_len;
439
440                 /* corrupt the data before we compute the checksum, to
441                  * simulate a client->OST data error */
442                 if (i == 0 && opc == OST_WRITE &&
443                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE))
444                         memcpy(ptr, "bad3", min(4, len));
445                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
446                 /* corrupt the data after we compute the checksum, to
447                  * simulate an OST->client data error */
448                 if (i == 0 && opc == OST_READ &&
449                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
450                         memcpy(ptr, "bad4", min(4, len));
451                         /* nobody should use corrupted page again */
452                         ClearPageUptodate(page);
453                 }
454                 kunmap(page);
455         }
456
457         return cksum;
458 }
459
460 static int ost_brw_lock_get(int mode, struct obd_export *exp,
461                             struct obd_ioobj *obj, struct niobuf_remote *nb,
462                             struct lustre_handle *lh)
463 {
464         int flags                 = 0;
465         int nrbufs                = obj->ioo_bufcnt;
466         struct ldlm_res_id res_id;
467         ldlm_policy_data_t policy;
468         int i;
469         ENTRY;
470
471         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
472         LASSERT(mode == LCK_PR || mode == LCK_PW);
473         LASSERT(!lustre_handle_is_used(lh));
474
475         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
476                 RETURN(0);
477
478         for (i = 1; i < nrbufs; i ++)
479                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
480                     (nb[i].flags & OBD_BRW_SRVLOCK))
481                         RETURN(-EFAULT);
482
483         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
484         policy.l_extent.end   = (nb[nrbufs - 1].offset +
485                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
486
487         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
488                                       LDLM_EXTENT, &policy, mode, &flags,
489                                       ldlm_blocking_ast, ldlm_completion_ast,
490                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
491 }
492
493 static void ost_brw_lock_put(int mode,
494                              struct obd_ioobj *obj, struct niobuf_remote *niob,
495                              struct lustre_handle *lh)
496 {
497         ENTRY;
498         LASSERT(mode == LCK_PR || mode == LCK_PW);
499         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
500                 lustre_handle_is_used(lh));
501         if (lustre_handle_is_used(lh))
502                 ldlm_lock_decref(lh, mode);
503         EXIT;
504 }
505
506 struct ost_prolong_data {
507         struct obd_export *opd_exp;
508         ldlm_policy_data_t opd_policy;
509         struct obdo *opd_oa;
510         ldlm_mode_t opd_mode;
511         int opd_lock_match;
512         int opd_timeout;
513 };
514
515 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
516 {
517         struct ost_prolong_data *opd = data;
518
519         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
520
521         if (lock->l_req_mode != lock->l_granted_mode) {
522                 /* scan granted locks only */
523                 return LDLM_ITER_STOP;
524         }
525
526         if (lock->l_export != opd->opd_exp) {
527                 /* prolong locks only for given client */
528                 return LDLM_ITER_CONTINUE;
529         }
530
531         if (!(lock->l_granted_mode & opd->opd_mode)) {
532                 /* we aren't interesting in all type of locks */
533                 return LDLM_ITER_CONTINUE;
534         }
535
536         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
537             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
538                 /* the request doesn't cross the lock, skip it */
539                 return LDLM_ITER_CONTINUE;
540         }
541
542         /* Fill the obdo with the matched lock handle.
543          * XXX: it is possible in some cases the IO RPC is covered by several
544          * locks, even for the write case, so it may need to be a lock list. */
545         if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
546                 opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
547                 opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
548         }
549
550         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
551                 /* ignore locks not being cancelled */
552                 return LDLM_ITER_CONTINUE;
553         }
554
555         CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
556                lock->l_resource->lr_name.name[0],
557                lock->l_resource->lr_name.name[1],
558                opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
559         /* OK. this is a possible lock the user holds doing I/O
560          * let's refresh eviction timer for it */
561         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
562         opd->opd_lock_match = 1;
563
564         return LDLM_ITER_CONTINUE;
565 }
566
567 static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
568                                 struct niobuf_remote *nb, struct obdo *oa,
569                                 ldlm_mode_t mode)
570 {
571         struct ldlm_res_id res_id;
572         int nrbufs = obj->ioo_bufcnt;
573         struct ost_prolong_data opd = { 0 };
574         ENTRY;
575
576         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
577
578         opd.opd_mode = mode;
579         opd.opd_exp = req->rq_export;
580         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
581         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
582                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
583
584         /* prolong locks for the current service time of the corresponding
585          * portal (= OST_IO_PORTAL) */
586         opd.opd_timeout = AT_OFF ? obd_timeout / 2:
587                           max(at_est2timeout(at_get(&req->rq_rqbd->
588                               rqbd_service->srv_at_estimate)), ldlm_timeout);
589
590         CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
591                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
592                opd.opd_policy.l_extent.end);
593
594         if (oa->o_valid & OBD_MD_FLHANDLE) {
595                 struct ldlm_lock *lock;
596
597                 lock = ldlm_handle2lock(&oa->o_handle);
598                 if (lock != NULL) {
599                         ost_prolong_locks_iter(lock, &opd);
600                         if (opd.opd_lock_match) {
601                                 LDLM_LOCK_PUT(lock);
602                                 RETURN(1);
603                         }
604
605                         /* Check if the lock covers the whole IO region,
606                          * otherwise iterate through the resource. */
607                         if (lock->l_policy_data.l_extent.end >=
608                             opd.opd_policy.l_extent.end &&
609                             lock->l_policy_data.l_extent.start <=
610                             opd.opd_policy.l_extent.start) {
611                                 LDLM_LOCK_PUT(lock);
612                                 RETURN(0);
613                         }
614                         LDLM_LOCK_PUT(lock);
615                 }
616         }
617
618         opd.opd_oa = oa;
619         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
620                               ost_prolong_locks_iter, &opd);
621         RETURN(opd.opd_lock_match);
622 }
623
624 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
625 {
626         struct ptlrpc_bulk_desc *desc = NULL;
627         struct obd_export *exp = req->rq_export;
628         struct niobuf_remote *remote_nb;
629         struct niobuf_local *local_nb;
630         struct obd_ioobj *ioo;
631         struct ost_body *body, *repbody;
632         struct lustre_capa *capa = NULL;
633         struct l_wait_info lwi;
634         struct lustre_handle lockh = { 0 };
635         int niocount, npages, nob = 0, rc, i;
636         int no_reply = 0;
637         ENTRY;
638
639         req->rq_bulk_read = 1;
640
641         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
642                 GOTO(out, rc = -EIO);
643
644         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
645
646         /* Check if there is eviction in progress, and if so, wait for it to
647          * finish */
648         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
649                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
650                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
651                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
652                         &lwi);
653         }
654         if (exp->exp_failed)
655                 GOTO(out, rc = -ENOTCONN);
656
657         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
658          * ost_rw_hpreq_check(). */
659         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
660         if (body == NULL)
661                 GOTO(out, rc = -EFAULT);
662
663         /*
664          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
665          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
666          * &RMF_NIOBUF_REMOTE.
667          */
668         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
669         if (ioo == NULL)
670                 GOTO(out, rc = -EFAULT);
671
672         niocount = ioo->ioo_bufcnt;
673         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
674         if (remote_nb == NULL)
675                 GOTO(out, rc = -EFAULT);
676
677         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
678                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
679                 if (capa == NULL) {
680                         CERROR("Missing capability for OST BRW READ");
681                         GOTO(out, rc = -EFAULT);
682                 }
683         }
684
685         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, 0);
686         rc = req_capsule_server_pack(&req->rq_pill);
687         if (rc)
688                 GOTO(out, rc);
689
690         /*
691          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
692          * ost_thread_init().
693          */
694         local_nb = ost_tls(req)->local;
695
696         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
697         if (rc != 0)
698                 GOTO(out_bulk, rc);
699
700         /*
701          * If getting the lock took more time than
702          * client was willing to wait, drop it. b=11330
703          */
704         if (cfs_time_current_sec() > req->rq_deadline ||
705             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
706                 no_reply = 1;
707                 CERROR("Dropping timed-out read from %s because locking"
708                        "object "LPX64" took %ld seconds (limit was %ld).\n",
709                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
710                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
711                        req->rq_deadline - req->rq_arrival_time.tv_sec);
712                 GOTO(out_lock, rc = -ETIMEDOUT);
713         }
714
715         npages = OST_THREAD_POOL_SIZE;
716         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
717                         remote_nb, &npages, local_nb, oti, capa);
718         if (rc != 0)
719                 GOTO(out_lock, rc);
720
721         desc = ptlrpc_prep_bulk_exp(req, npages,
722                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
723         if (desc == NULL)
724                 GOTO(out_lock, rc = -ENOMEM);
725
726         if (!lustre_handle_is_used(&lockh))
727                 /* no needs to try to prolong lock if server is asked
728                  * to handle locking (= OBD_BRW_SRVLOCK) */
729                 ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa,
730                                      LCK_PW | LCK_PR);
731
732         nob = 0;
733         for (i = 0; i < npages; i++) {
734                 int page_rc = local_nb[i].rc;
735
736                 if (page_rc < 0) {              /* error */
737                         rc = page_rc;
738                         break;
739                 }
740
741                 nob += page_rc;
742                 if (page_rc != 0) {             /* some data! */
743                         LASSERT (local_nb[i].page != NULL);
744                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
745                                               local_nb[i].offset & ~CFS_PAGE_MASK,
746                                               page_rc);
747                 }
748
749                 if (page_rc != local_nb[i].len) { /* short read */
750                         /* All subsequent pages should be 0 */
751                         while(++i < npages)
752                                 LASSERT(local_nb[i].rc == 0);
753                         break;
754                 }
755         }
756
757         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
758                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
759
760                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
761                         cksum_type = cksum_type_unpack(body->oa.o_flags);
762                 body->oa.o_flags = cksum_type_pack(cksum_type);
763                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
764                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
765                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
766         } else {
767                 body->oa.o_valid = 0;
768         }
769         /* We're finishing using body->oa as an input variable */
770
771         /* Check if client was evicted while we were doing i/o before touching
772            network */
773         if (rc == 0) {
774                 /* Check if there is eviction in progress, and if so, wait for
775                  * it to finish */
776                 if (unlikely(atomic_read(&exp->exp_obd->
777                                                 obd_evict_inprogress))) {
778                         lwi = LWI_INTR(NULL, NULL);
779                         rc = l_wait_event(exp->exp_obd->
780                                                 obd_evict_inprogress_waitq,
781                                           !atomic_read(&exp->exp_obd->
782                                                         obd_evict_inprogress),
783                                           &lwi);
784                 }
785                 /* Check if client was evicted or tried to reconnect already */
786                 if (exp->exp_failed || exp->exp_abort_active_req)
787                         rc = -ENOTCONN;
788                 else {
789                         rc = sptlrpc_svc_wrap_bulk(req, desc);
790                         if (rc == 0)
791                                 rc = ptlrpc_start_bulk_transfer(desc);
792                 }
793
794                 if (rc == 0) {
795                         time_t start = cfs_time_current_sec();
796                         do {
797                                 long timeoutl = req->rq_deadline -
798                                         cfs_time_current_sec();
799                                 cfs_duration_t timeout = timeoutl <= 0 ?
800                                         CFS_TICK : cfs_time_seconds(timeoutl);
801                                 lwi = LWI_TIMEOUT_INTERVAL(timeout,
802                                                            cfs_time_seconds(1),
803                                                            ost_bulk_timeout,
804                                                            desc);
805                                 rc = l_wait_event(desc->bd_waitq,
806                                                   !ptlrpc_server_bulk_active(desc) ||
807                                                   exp->exp_failed ||
808                                                   exp->exp_abort_active_req,
809                                                   &lwi);
810                                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
811                                 /* Wait again if we changed deadline */
812                         } while ((rc == -ETIMEDOUT) &&
813                                  (req->rq_deadline > cfs_time_current_sec()));
814
815                         if (rc == -ETIMEDOUT) {
816                                 DEBUG_REQ(D_ERROR, req,
817                                           "timeout on bulk PUT after %ld%+lds",
818                                           req->rq_deadline - start,
819                                           cfs_time_current_sec() -
820                                           req->rq_deadline);
821                                 ptlrpc_abort_bulk(desc);
822                         } else if (exp->exp_failed) {
823                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
824                                 rc = -ENOTCONN;
825                                 ptlrpc_abort_bulk(desc);
826                         } else if (exp->exp_abort_active_req) {
827                                 DEBUG_REQ(D_ERROR, req, "Reconnect on bulk PUT");
828                                 /* we don't reply anyway */
829                                 rc = -ETIMEDOUT;
830                                 ptlrpc_abort_bulk(desc);
831                         } else if (!desc->bd_success ||
832                                    desc->bd_nob_transferred != desc->bd_nob) {
833                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
834                                           desc->bd_success ?
835                                           "truncated" : "network error on",
836                                           desc->bd_nob_transferred,
837                                           desc->bd_nob);
838                                 /* XXX should this be a different errno? */
839                                 rc = -ETIMEDOUT;
840                         }
841                 } else {
842                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
843                 }
844                 no_reply = rc != 0;
845         }
846
847         /* Must commit after prep above in all cases */
848         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
849                           remote_nb, npages, local_nb, oti, rc);
850
851         if (rc == 0) {
852                 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
853                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
854                 ost_drop_id(exp, &repbody->oa);
855         }
856
857 out_lock:
858         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
859 out_bulk:
860         if (desc)
861                 ptlrpc_free_bulk(desc);
862 out:
863         LASSERT(rc <= 0);
864         if (rc == 0) {
865                 req->rq_status = nob;
866                 ptlrpc_lprocfs_brw(req, nob);
867                 target_committed_to_req(req);
868                 ptlrpc_reply(req);
869         } else if (!no_reply) {
870                 /* Only reply if there was no comms problem with bulk */
871                 target_committed_to_req(req);
872                 req->rq_status = rc;
873                 ptlrpc_error(req);
874         } else {
875                 /* reply out callback would free */
876                 ptlrpc_req_drop_rs(req);
877                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
878                       "client will retry\n",
879                       exp->exp_obd->obd_name,
880                       exp->exp_client_uuid.uuid,
881                       exp->exp_connection->c_remote_uuid.uuid,
882                       libcfs_id2str(req->rq_peer));
883         }
884
885         RETURN(rc);
886 }
887
888 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
889 {
890         struct ptlrpc_bulk_desc *desc = NULL;
891         struct obd_export       *exp = req->rq_export;
892         struct niobuf_remote    *remote_nb;
893         struct niobuf_local     *local_nb;
894         struct obd_ioobj        *ioo;
895         struct ost_body         *body, *repbody;
896         struct l_wait_info       lwi;
897         struct lustre_handle     lockh = {0};
898         struct lustre_capa      *capa = NULL;
899         __u32                   *rcs;
900         int objcount, niocount, npages;
901         int rc, i, j;
902         obd_count                client_cksum = 0, server_cksum = 0;
903         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
904         int                      no_reply = 0;
905         __u32                    o_uid = 0, o_gid = 0;
906         ENTRY;
907
908         req->rq_bulk_write = 1;
909
910         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
911                 GOTO(out, rc = -EIO);
912         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
913                 GOTO(out, rc = -EFAULT);
914
915         /* pause before transaction has been started */
916         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
917
918         /* Check if there is eviction in progress, and if so, wait for it to
919          * finish */
920         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
921                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
922                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
923                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
924                         &lwi);
925         }
926         if (exp->exp_failed)
927                 GOTO(out, rc = -ENOTCONN);
928
929         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
930          * ost_rw_hpreq_check(). */
931         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
932         if (body == NULL)
933                 GOTO(out, rc = -EFAULT);
934
935         if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
936             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
937                 libcfs_memory_pressure_set();
938
939         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
940                                         RCL_CLIENT) / sizeof(*ioo);
941         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
942         if (ioo == NULL)
943                 GOTO(out, rc = -EFAULT);
944         for (niocount = i = 0; i < objcount; i++)
945                 niocount += ioo[i].ioo_bufcnt;
946
947         /*
948          * It'd be nice to have a capsule function to indicate how many elements
949          * there were in a buffer for an RMF that's declared to be an array.
950          * It's easy enough to compute the number of elements here though.
951          */
952         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
953         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
954             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
955                 GOTO(out, rc = -EFAULT);
956
957         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
958                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
959                 if (capa == NULL) {
960                         CERROR("Missing capability for OST BRW WRITE");
961                         GOTO(out, rc = -EFAULT);
962                 }
963         }
964
965         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
966                              niocount * sizeof(*rcs));
967         rc = req_capsule_server_pack(&req->rq_pill);
968         if (rc != 0)
969                 GOTO(out, rc);
970         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
971         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
972
973         /*
974          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
975          * ost_thread_init().
976          */
977         local_nb = ost_tls(req)->local;
978
979         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
980         if (rc != 0)
981                 GOTO(out_bulk, rc);
982
983         /*
984          * If getting the lock took more time than
985          * client was willing to wait, drop it. b=11330
986          */
987         if (cfs_time_current_sec() > req->rq_deadline ||
988             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
989                 no_reply = 1;
990                 CERROR("Dropping timed-out write from %s because locking "
991                        "object "LPX64" took %ld seconds (limit was %ld).\n",
992                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
993                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
994                        req->rq_deadline - req->rq_arrival_time.tv_sec);
995                 GOTO(out_lock, rc = -ETIMEDOUT);
996         }
997
998         if (!lustre_handle_is_used(&lockh))
999                 /* no needs to try to prolong lock if server is asked
1000                  * to handle locking (= OBD_BRW_SRVLOCK) */
1001                 ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
1002
1003         /* obd_preprw clobbers oa->valid, so save what we need */
1004         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1005                 client_cksum = body->oa.o_cksum;
1006                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1007                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1008         }
1009
1010         /* Because we already sync grant info with client when reconnect,
1011          * grant info will be cleared for resent req, then fed_grant and
1012          * total_grant will not be modified in following preprw_write */
1013         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1014                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1015                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1016         }
1017
1018         if (exp_connect_rmtclient(exp)) {
1019                 o_uid = body->oa.o_uid;
1020                 o_gid = body->oa.o_gid;
1021         }
1022         npages = OST_THREAD_POOL_SIZE;
1023         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
1024                         ioo, remote_nb, &npages, local_nb, oti, capa);
1025         if (rc != 0)
1026                 GOTO(out_lock, rc);
1027
1028         desc = ptlrpc_prep_bulk_exp(req, npages,
1029                                      BULK_GET_SINK, OST_BULK_PORTAL);
1030         if (desc == NULL)
1031                 GOTO(out_lock, rc = -ENOMEM);
1032
1033         /* NB Having prepped, we must commit... */
1034
1035         for (i = 0; i < npages; i++)
1036                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1037                                       local_nb[i].offset & ~CFS_PAGE_MASK,
1038                                       local_nb[i].len);
1039
1040         rc = sptlrpc_svc_prep_bulk(req, desc);
1041         if (rc != 0)
1042                 GOTO(out_lock, rc);
1043
1044         /* Check if client was evicted or tried to reconnect while we
1045          * were doing i/o before touching network */
1046         if (desc->bd_export->exp_failed ||
1047             desc->bd_export->exp_abort_active_req)
1048                 rc = -ENOTCONN;
1049         else
1050                 rc = ptlrpc_start_bulk_transfer(desc);
1051         if (rc == 0) {
1052                 time_t start = cfs_time_current_sec();
1053                 do {
1054                         long timeoutl = req->rq_deadline -
1055                                 cfs_time_current_sec();
1056                         cfs_duration_t timeout = timeoutl <= 0 ?
1057                                 CFS_TICK : cfs_time_seconds(timeoutl);
1058                         lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
1059                                                    ost_bulk_timeout, desc);
1060                         rc = l_wait_event(desc->bd_waitq,
1061                                           !ptlrpc_server_bulk_active(desc) ||
1062                                           desc->bd_export->exp_failed ||
1063                                           desc->bd_export->exp_abort_active_req,
1064                                           &lwi);
1065                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1066                         /* Wait again if we changed deadline */
1067                 } while ((rc == -ETIMEDOUT) &&
1068                          (req->rq_deadline > cfs_time_current_sec()));
1069
1070                 if (rc == -ETIMEDOUT) {
1071                         DEBUG_REQ(D_ERROR, req,
1072                                   "timeout on bulk GET after %ld%+lds",
1073                                   req->rq_deadline - start,
1074                                   cfs_time_current_sec() -
1075                                   req->rq_deadline);
1076                         ptlrpc_abort_bulk(desc);
1077                 } else if (desc->bd_export->exp_failed) {
1078                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1079                         rc = -ENOTCONN;
1080                         ptlrpc_abort_bulk(desc);
1081                 } else if (desc->bd_export->exp_abort_active_req) {
1082                         DEBUG_REQ(D_ERROR, req, "Reconnect on bulk GET");
1083                         /* we don't reply anyway */
1084                         rc = -ETIMEDOUT;
1085                         ptlrpc_abort_bulk(desc);
1086                 } else if (!desc->bd_success) {
1087                         DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
1088                         /* XXX should this be a different errno? */
1089                         rc = -ETIMEDOUT;
1090                 } else {
1091                         rc = sptlrpc_svc_unwrap_bulk(req, desc);
1092                 }
1093         } else {
1094                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
1095         }
1096         no_reply = rc != 0;
1097
1098         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1099         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1100
1101         if (unlikely(client_cksum != 0 && rc == 0)) {
1102                 static int cksum_counter;
1103                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1104                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1105                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1106                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1107                 repbody->oa.o_cksum = server_cksum;
1108                 cksum_counter++;
1109                 if (unlikely(client_cksum != server_cksum)) {
1110                         CERROR("client csum %x, server csum %x\n",
1111                                client_cksum, server_cksum);
1112                         cksum_counter = 0;
1113                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1114                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1115                                cksum_counter, libcfs_id2str(req->rq_peer),
1116                                server_cksum);
1117                 }
1118         }
1119
1120         /* Must commit after prep above in all cases */
1121         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1122                           remote_nb, npages, local_nb, oti, rc);
1123         if (rc == -ENOTCONN)
1124                 /* quota acquire process has been given up because
1125                  * either the client has been evicted or the client
1126                  * has timed out the request already */
1127                 no_reply = 1;
1128
1129         if (exp_connect_rmtclient(exp)) {
1130                 repbody->oa.o_uid = o_uid;
1131                 repbody->oa.o_gid = o_gid;
1132         }
1133
1134         if (unlikely(client_cksum != server_cksum && rc == 0)) {
1135                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1136                 char *msg;
1137                 char *via;
1138                 char *router;
1139
1140                 if (new_cksum == server_cksum)
1141                         msg = "changed in transit before arrival at OST";
1142                 else if (new_cksum == client_cksum)
1143                         msg = "initial checksum before message complete";
1144                 else
1145                         msg = "changed in transit AND after initial checksum";
1146
1147                 if (req->rq_peer.nid == desc->bd_sender) {
1148                         via = router = "";
1149                 } else {
1150                         via = " via ";
1151                         router = libcfs_nid2str(desc->bd_sender);
1152                 }
1153
1154                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1155                                    "%s%s%s inum "LPU64"/"LPU64" object "
1156                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1157                                    exp->exp_obd->obd_name, msg,
1158                                    libcfs_id2str(req->rq_peer),
1159                                    via, router,
1160                                    body->oa.o_valid & OBD_MD_FLFID ?
1161                                                 body->oa.o_fid : (__u64)0,
1162                                    body->oa.o_valid & OBD_MD_FLFID ?
1163                                                 body->oa.o_generation :(__u64)0,
1164                                    body->oa.o_id,
1165                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1166                                                 body->oa.o_gr : (__u64)0,
1167                                    local_nb[0].offset,
1168                                    local_nb[npages-1].offset +
1169                                    local_nb[npages-1].len - 1 );
1170                 CERROR("client csum %x, original server csum %x, "
1171                        "server csum now %x\n",
1172                        client_cksum, server_cksum, new_cksum);
1173         }
1174
1175         if (rc == 0) {
1176                 int nob = 0;
1177
1178                 /* set per-requested niobuf return codes */
1179                 for (i = j = 0; i < niocount; i++) {
1180                         int len = remote_nb[i].len;
1181
1182                         nob += len;
1183                         rcs[i] = 0;
1184                         do {
1185                                 LASSERT(j < npages);
1186                                 if (local_nb[j].rc < 0)
1187                                         rcs[i] = local_nb[j].rc;
1188                                 len -= local_nb[j].len;
1189                                 j++;
1190                         } while (len > 0);
1191                         LASSERT(len == 0);
1192                 }
1193                 LASSERT(j == npages);
1194                 ptlrpc_lprocfs_brw(req, nob);
1195         }
1196
1197 out_lock:
1198         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1199 out_bulk:
1200         if (desc)
1201                 ptlrpc_free_bulk(desc);
1202 out:
1203         if (rc == 0) {
1204                 oti_to_request(oti, req);
1205                 target_committed_to_req(req);
1206                 rc = ptlrpc_reply(req);
1207         } else if (!no_reply) {
1208                 /* Only reply if there was no comms problem with bulk */
1209                 target_committed_to_req(req);
1210                 req->rq_status = rc;
1211                 ptlrpc_error(req);
1212         } else {
1213                 /* reply out callback would free */
1214                 ptlrpc_req_drop_rs(req);
1215                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1216                       "client will retry\n",
1217                       exp->exp_obd->obd_name,
1218                       exp->exp_client_uuid.uuid,
1219                       exp->exp_connection->c_remote_uuid.uuid,
1220                       libcfs_id2str(req->rq_peer));
1221         }
1222         libcfs_memory_pressure_clr();
1223         RETURN(rc);
1224 }
1225
1226 /**
1227  * Implementation of OST_SET_INFO.
1228  *
1229  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1230  * "key" and a value RPC buffers as arguments, with the value's contents
1231  * interpreted according to the key.
1232  *
1233  * Value types that need swabbing have swabbing done explicitly, either here or
1234  * in functions called from here.  This should be corrected: all swabbing should
1235  * be done in the capsule abstraction, as that will then allow us to move
1236  * swabbing exclusively to the client without having to modify server code
1237  * outside the capsule abstraction's implementation itself.  To correct this
1238  * will require minor changes to the capsule abstraction; see the comments for
1239  * req_capsule_extend() in layout.c.
1240  */
1241 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1242 {
1243         struct ost_body *body = NULL, *repbody;
1244         char *key, *val = NULL;
1245         int keylen, vallen, rc = 0;
1246         int is_grant_shrink = 0;
1247         ENTRY;
1248
1249         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1250         if (key == NULL) {
1251                 DEBUG_REQ(D_HA, req, "no set_info key");
1252                 RETURN(-EFAULT);
1253         }
1254         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1255                                       RCL_CLIENT);
1256
1257         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1258                                       RCL_CLIENT);
1259
1260         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1261                 /* In this case the value is actually an RMF_OST_BODY, so we
1262                  * transmutate the type of this PTLRPC */
1263                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1264
1265         rc = req_capsule_server_pack(&req->rq_pill);
1266         if (rc)
1267                 RETURN(rc);
1268
1269         if (vallen) {
1270                 if (is_grant_shrink) {
1271                         body = req_capsule_client_get(&req->rq_pill,
1272                                                       &RMF_OST_BODY);
1273                         if (!body)
1274                                 RETURN(-EFAULT);
1275
1276                         repbody = req_capsule_server_get(&req->rq_pill,
1277                                                          &RMF_OST_BODY);
1278                         memcpy(repbody, body, sizeof(*body));
1279                         val = (char*)repbody;
1280                 } else {
1281                         val = req_capsule_client_get(&req->rq_pill,
1282                                                      &RMF_SETINFO_VAL);
1283                 }
1284         }
1285
1286         if (KEY_IS(KEY_EVICT_BY_NID)) {
1287                 if (val && vallen)
1288                         obd_export_evict_by_nid(exp->exp_obd, val);
1289                 GOTO(out, rc = 0);
1290         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1291                 if (vallen < sizeof(__u32))
1292                         RETURN(-EFAULT);
1293                 __swab32s((__u32 *)val);
1294         }
1295
1296         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1297          * a struct ost_body * value */
1298         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1299 out:
1300         lustre_msg_set_status(req->rq_repmsg, 0);
1301         RETURN(rc);
1302 }
1303
1304 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1305 {
1306         void *key, *reply;
1307         int keylen, replylen, rc = 0;
1308         struct req_capsule *pill = &req->rq_pill;
1309         ENTRY;
1310
1311         /* this common part for get_info rpc */
1312         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1313         if (key == NULL) {
1314                 DEBUG_REQ(D_HA, req, "no get_info key");
1315                 RETURN(-EFAULT);
1316         }
1317         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1318
1319         rc = obd_get_info(exp, keylen, key, &replylen, NULL, NULL);
1320         if (rc)
1321                 RETURN(rc);
1322
1323         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1324                              RCL_SERVER, replylen);
1325
1326         rc = req_capsule_server_pack(pill);
1327         if (rc)
1328                 RETURN(rc);
1329
1330         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1331         if (reply == NULL)
1332                 RETURN(-ENOMEM);
1333
1334         /* call again to fill in the reply buffer */
1335         rc = obd_get_info(exp, keylen, key, &replylen, reply, NULL);
1336
1337         lustre_msg_set_status(req->rq_repmsg, 0);
1338         RETURN(rc);
1339 }
1340
1341 #ifdef HAVE_QUOTA_SUPPORT
1342 static int ost_handle_quotactl(struct ptlrpc_request *req)
1343 {
1344         struct obd_quotactl *oqctl, *repoqc;
1345         int rc;
1346         ENTRY;
1347
1348         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1349         if (oqctl == NULL)
1350                 GOTO(out, rc = -EPROTO);
1351
1352         rc = req_capsule_server_pack(&req->rq_pill);
1353         if (rc)
1354                 GOTO(out, rc);
1355
1356         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1357         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1358         *repoqc = *oqctl;
1359
1360 out:
1361         RETURN(rc);
1362 }
1363
1364 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1365 {
1366         struct obd_quotactl *oqctl;
1367         int rc;
1368         ENTRY;
1369
1370         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1371         if (oqctl == NULL)
1372                 RETURN(-EPROTO);
1373
1374         rc = req_capsule_server_pack(&req->rq_pill);
1375         if (rc)
1376                 RETURN(-ENOMEM);
1377
1378         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1379         RETURN(0);
1380 }
1381
1382 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1383 {
1384         struct quota_adjust_qunit *oqaq, *repoqa;
1385         struct lustre_quota_ctxt *qctxt;
1386         int rc;
1387         ENTRY;
1388
1389         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1390         oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1391         if (oqaq == NULL)
1392                 GOTO(out, rc = -EPROTO);
1393
1394         rc = req_capsule_server_pack(&req->rq_pill);
1395         if (rc)
1396                 GOTO(out, rc);
1397
1398         repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1399         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
1400         *repoqa = *oqaq;
1401
1402  out:
1403         RETURN(rc);
1404 }
1405 #endif
1406
1407 static int ost_llog_handle_connect(struct obd_export *exp,
1408                                    struct ptlrpc_request *req)
1409 {
1410         struct llogd_conn_body *body;
1411         int rc;
1412         ENTRY;
1413
1414         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1415         rc = obd_llog_connect(exp, body);
1416         RETURN(rc);
1417 }
1418
1419 #define ost_init_sec_none(reply, exp)                                   \
1420 do {                                                                    \
1421         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1422                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1423                                       OBD_CONNECT_OSS_CAPA);            \
1424         spin_lock(&exp->exp_lock);                                      \
1425         exp->exp_connect_flags = reply->ocd_connect_flags;              \
1426         spin_unlock(&exp->exp_lock);                                    \
1427 } while (0)
1428
1429 static int ost_init_sec_level(struct ptlrpc_request *req)
1430 {
1431         struct obd_export *exp = req->rq_export;
1432         struct req_capsule *pill = &req->rq_pill;
1433         struct obd_device *obd = exp->exp_obd;
1434         struct filter_obd *filter = &obd->u.filter;
1435         char *client = libcfs_nid2str(req->rq_peer.nid);
1436         struct obd_connect_data *data, *reply;
1437         int rc = 0, remote;
1438         ENTRY;
1439
1440         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1441         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1442         if (data == NULL || reply == NULL)
1443                 RETURN(-EFAULT);
1444
1445         /* connection from MDT is always trusted */
1446         if (req->rq_auth_usr_mdt) {
1447                 ost_init_sec_none(reply, exp);
1448                 RETURN(0);
1449         }
1450
1451         /* no GSS support case */
1452         if (!req->rq_auth_gss) {
1453                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1454                         CWARN("client %s -> target %s does not user GSS, "
1455                               "can not run under security level %d.\n",
1456                               client, obd->obd_name, filter->fo_sec_level);
1457                         RETURN(-EACCES);
1458                 } else {
1459                         ost_init_sec_none(reply, exp);
1460                         RETURN(0);
1461                 }
1462         }
1463
1464         /* old version case */
1465         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1466                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1467                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1468                         CWARN("client %s -> target %s uses old version, "
1469                               "can not run under security level %d.\n",
1470                               client, obd->obd_name, filter->fo_sec_level);
1471                         RETURN(-EACCES);
1472                 } else {
1473                         CWARN("client %s -> target %s uses old version, "
1474                               "run under security level %d.\n",
1475                               client, obd->obd_name, filter->fo_sec_level);
1476                         ost_init_sec_none(reply, exp);
1477                         RETURN(0);
1478                 }
1479         }
1480
1481         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1482         if (remote) {
1483                 if (!req->rq_auth_remote)
1484                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1485                                "asked to be remote.\n", client, obd->obd_name);
1486         } else if (req->rq_auth_remote) {
1487                 remote = 1;
1488                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1489                        "as remote by default.\n", client, obd->obd_name);
1490         }
1491
1492         if (remote) {
1493                 if (!filter->fo_fl_oss_capa) {
1494                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1495                                " but OSS capabilities are not enabled: %d.\n",
1496                                client, obd->obd_name, filter->fo_fl_oss_capa);
1497                         RETURN(-EACCES);
1498                 }
1499         }
1500
1501         switch (filter->fo_sec_level) {
1502         case LUSTRE_SEC_NONE:
1503                 if (!remote) {
1504                         ost_init_sec_none(reply, exp);
1505                         break;
1506                 } else {
1507                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1508                                "can not run under security level %d.\n",
1509                                client, obd->obd_name, filter->fo_sec_level);
1510                         RETURN(-EACCES);
1511                 }
1512         case LUSTRE_SEC_REMOTE:
1513                 if (!remote)
1514                         ost_init_sec_none(reply, exp);
1515                 break;
1516         case LUSTRE_SEC_ALL:
1517                 if (!remote) {
1518                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1519                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1520                         if (!filter->fo_fl_oss_capa)
1521                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1522
1523                         spin_lock(&exp->exp_lock);
1524                         exp->exp_connect_flags = reply->ocd_connect_flags;
1525                         spin_unlock(&exp->exp_lock);
1526                 }
1527                 break;
1528         default:
1529                 RETURN(-EINVAL);
1530         }
1531
1532         RETURN(rc);
1533 }
1534
1535 /*
1536  * FIXME
1537  * this should be done in filter_connect()/filter_reconnect(), but
1538  * we can't obtain information like NID, which stored in incoming
1539  * request, thus can't decide what flavor to use. so we do it here.
1540  *
1541  * This hack should be removed after the OST stack be rewritten, just
1542  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1543  */
1544 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1545 {
1546         struct obd_export     *exp = req->rq_export;
1547         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1548         struct sptlrpc_flavor  flvr;
1549         int                    rc = 0;
1550
1551         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1552                             LUSTRE_ECHO_NAME) == 0)) {
1553                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1554                 return 0;
1555         }
1556
1557         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1558                 read_lock(&filter->fo_sptlrpc_lock);
1559                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1560                                              req->rq_sp_from,
1561                                              req->rq_peer.nid,
1562                                              &flvr);
1563                 read_unlock(&filter->fo_sptlrpc_lock);
1564
1565                 spin_lock(&exp->exp_lock);
1566
1567                 exp->exp_sp_peer = req->rq_sp_from;
1568                 exp->exp_flvr = flvr;
1569
1570                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1571                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1572                         CERROR("unauthorized rpc flavor %x from %s, "
1573                                "expect %x\n", req->rq_flvr.sf_rpc,
1574                                libcfs_nid2str(req->rq_peer.nid),
1575                                exp->exp_flvr.sf_rpc);
1576                         rc = -EACCES;
1577                 }
1578
1579                 spin_unlock(&exp->exp_lock);
1580         } else {
1581                 if (exp->exp_sp_peer != req->rq_sp_from) {
1582                         CERROR("RPC source %s doesn't match %s\n",
1583                                sptlrpc_part2name(req->rq_sp_from),
1584                                sptlrpc_part2name(exp->exp_sp_peer));
1585                         rc = -EACCES;
1586                 } else {
1587                         rc = sptlrpc_target_export_check(exp, req);
1588                 }
1589         }
1590
1591         return rc;
1592 }
1593
1594 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1595                                        struct obd_device *obd, int *process)
1596 {
1597         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1598         case OST_CONNECT: /* This will never get here, but for completeness. */
1599         case OST_DISCONNECT:
1600                *process = 1;
1601                RETURN(0);
1602
1603         case OBD_PING:
1604         case OST_CREATE:
1605         case OST_DESTROY:
1606         case OST_PUNCH:
1607         case OST_SETATTR:
1608         case OST_SYNC:
1609         case OST_WRITE:
1610         case OBD_LOG_CANCEL:
1611         case LDLM_ENQUEUE:
1612                 *process = target_queue_recovery_request(req, obd);
1613                 RETURN(0);
1614
1615         default:
1616                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1617                 *process = -EAGAIN;
1618                 RETURN(0);
1619         }
1620 }
1621
1622 int ost_msg_check_version(struct lustre_msg *msg)
1623 {
1624         int rc;
1625
1626         switch(lustre_msg_get_opc(msg)) {
1627         case OST_CONNECT:
1628         case OST_DISCONNECT:
1629         case OBD_PING:
1630         case SEC_CTX_INIT:
1631         case SEC_CTX_INIT_CONT:
1632         case SEC_CTX_FINI:
1633                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1634                 if (rc)
1635                         CERROR("bad opc %u version %08x, expecting %08x\n",
1636                                lustre_msg_get_opc(msg),
1637                                lustre_msg_get_version(msg),
1638                                LUSTRE_OBD_VERSION);
1639                 break;
1640         case OST_CREATE:
1641         case OST_DESTROY:
1642         case OST_GETATTR:
1643         case OST_SETATTR:
1644         case OST_WRITE:
1645         case OST_READ:
1646         case OST_PUNCH:
1647         case OST_STATFS:
1648         case OST_SYNC:
1649         case OST_SET_INFO:
1650         case OST_GET_INFO:
1651 #ifdef HAVE_QUOTA_SUPPORT
1652         case OST_QUOTACHECK:
1653         case OST_QUOTACTL:
1654         case OST_QUOTA_ADJUST_QUNIT:
1655 #endif
1656                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1657                 if (rc)
1658                         CERROR("bad opc %u version %08x, expecting %08x\n",
1659                                lustre_msg_get_opc(msg),
1660                                lustre_msg_get_version(msg),
1661                                LUSTRE_OST_VERSION);
1662                 break;
1663         case LDLM_ENQUEUE:
1664         case LDLM_CONVERT:
1665         case LDLM_CANCEL:
1666         case LDLM_BL_CALLBACK:
1667         case LDLM_CP_CALLBACK:
1668                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1669                 if (rc)
1670                         CERROR("bad opc %u version %08x, expecting %08x\n",
1671                                lustre_msg_get_opc(msg),
1672                                lustre_msg_get_version(msg),
1673                                LUSTRE_DLM_VERSION);
1674                 break;
1675         case LLOG_ORIGIN_CONNECT:
1676         case OBD_LOG_CANCEL:
1677                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1678                 if (rc)
1679                         CERROR("bad opc %u version %08x, expecting %08x\n",
1680                                lustre_msg_get_opc(msg),
1681                                lustre_msg_get_version(msg),
1682                                LUSTRE_LOG_VERSION);
1683                 break;
1684         default:
1685                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1686                 rc = -ENOTSUPP;
1687         }
1688         return rc;
1689 }
1690
1691 /**
1692  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1693  * not.
1694  */
1695 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1696                                    struct ldlm_lock *lock)
1697 {
1698         struct niobuf_remote *nb;
1699         struct obd_ioobj *ioo;
1700         struct ost_body *body;
1701         int objcount, niocount;
1702         int mode, opc, i;
1703         __u64 start, end;
1704         ENTRY;
1705
1706         opc = lustre_msg_get_opc(req->rq_reqmsg);
1707         LASSERT(opc == OST_READ || opc == OST_WRITE);
1708
1709         /* As the request may be covered by several locks, do not look at
1710          * o_handle, look at the RPC IO region. */
1711         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1712         if (body == NULL)
1713                 RETURN(0);
1714
1715         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1716                                         RCL_CLIENT) / sizeof(*ioo);
1717         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1718         if (ioo == NULL)
1719                 RETURN(0);
1720
1721         for (niocount = i = 0; i < objcount; i++)
1722                 niocount += ioo[i].ioo_bufcnt;
1723
1724         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1725         if (nb == NULL ||
1726             niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
1727             RCL_CLIENT) / sizeof(*nb)))
1728                 RETURN(0);
1729
1730         mode = LCK_PW;
1731         if (opc == OST_READ)
1732                 mode |= LCK_PR;
1733
1734         start = nb[0].offset & CFS_PAGE_MASK;
1735         end = (nb[ioo->ioo_bufcnt - 1].offset +
1736                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
1737
1738         LASSERT(lock->l_resource != NULL);
1739         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_gr,
1740                              &lock->l_resource->lr_name))
1741                 RETURN(0);
1742
1743         if (!(lock->l_granted_mode & mode))
1744                 RETURN(0);
1745
1746         if (lock->l_policy_data.l_extent.end < start ||
1747             lock->l_policy_data.l_extent.start > end)
1748                 RETURN(0);
1749
1750         RETURN(1);
1751 }
1752
1753 /**
1754  * High-priority queue request check for whether the given PTLRPC request (\a
1755  * req) is blocking an LDLM lock cancel.
1756  *
1757  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1758  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1759  *
1760  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1761  * function looks only at OST_READs and OST_WRITEs.
1762  */
1763 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1764 {
1765         struct niobuf_remote *nb;
1766         struct obd_ioobj *ioo;
1767         struct ost_body *body;
1768         int objcount, niocount;
1769         int mode, opc, i;
1770         ENTRY;
1771
1772         opc = lustre_msg_get_opc(req->rq_reqmsg);
1773         LASSERT(opc == OST_READ || opc == OST_WRITE);
1774
1775         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1776         if (body == NULL)
1777                 RETURN(-EFAULT);
1778
1779         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1780                                         RCL_CLIENT) / sizeof(*ioo);
1781         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1782         if (ioo == NULL)
1783                 RETURN(-EFAULT);
1784
1785         for (niocount = i = 0; i < objcount; i++)
1786                 niocount += ioo[i].ioo_bufcnt;
1787         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1788         if (nb == NULL ||
1789             niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
1790             RCL_CLIENT) / sizeof(*nb)))
1791                 RETURN(-EFAULT);
1792         if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
1793                 RETURN(-EFAULT);
1794
1795         mode = LCK_PW;
1796         if (opc == OST_READ)
1797                 mode |= LCK_PR;
1798         RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
1799 }
1800
1801 static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
1802 {
1803         struct ldlm_res_id res_id = { .name = { oa->o_id } };
1804         struct ost_prolong_data opd = { 0 };
1805         __u64 start, end;
1806         ENTRY;
1807
1808         start = oa->o_size;
1809         end = start + oa->o_blocks;
1810
1811         opd.opd_mode = LCK_PW;
1812         opd.opd_exp = req->rq_export;
1813         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
1814         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
1815                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
1816         else
1817                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
1818
1819         /* prolong locks for the current service time of the corresponding
1820          * portal (= OST_IO_PORTAL) */
1821         opd.opd_timeout = AT_OFF ? obd_timeout / 2:
1822                           max(at_est2timeout(at_get(&req->rq_rqbd->
1823                               rqbd_service->srv_at_estimate)), ldlm_timeout);
1824
1825         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1826                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
1827                opd.opd_policy.l_extent.end);
1828
1829         opd.opd_oa = oa;
1830         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
1831                               ost_prolong_locks_iter, &opd);
1832         RETURN(opd.opd_lock_match);
1833 }
1834
1835 /**
1836  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1837  */
1838 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1839                                       struct ldlm_lock *lock)
1840 {
1841         struct ost_body *body;
1842         ENTRY;
1843
1844         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1845         if (body == NULL)
1846                 RETURN(0);  /* can't return -EFAULT here */
1847
1848         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1849             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1850                 RETURN(1);
1851         RETURN(0);
1852 }
1853
1854 /**
1855  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1856  */
1857 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1858 {
1859         struct ost_body *body;
1860
1861         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1862         if (body == NULL)
1863                 RETURN(-EFAULT);
1864
1865         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1866                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
1867
1868         RETURN(ost_punch_prolong_locks(req, &body->oa));
1869 }
1870
1871 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1872         .hpreq_lock_match  = ost_rw_hpreq_lock_match,
1873         .hpreq_check       = ost_rw_hpreq_check,
1874 };
1875
1876 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1877         .hpreq_lock_match  = ost_punch_hpreq_lock_match,
1878         .hpreq_check       = ost_punch_hpreq_check,
1879 };
1880
1881 /** Assign high priority operations to the request if needed. */
1882 static int ost_hpreq_handler(struct ptlrpc_request *req)
1883 {
1884         ENTRY;
1885         if (req->rq_export) {
1886                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1887                 struct ost_body *body;
1888
1889                 if (opc == OST_READ || opc == OST_WRITE) {
1890                         struct niobuf_remote *nb;
1891                         struct obd_ioobj *ioo;
1892                         int objcount, niocount;
1893                         int i;
1894
1895                         /* RPCs on the H-P queue can be inspected before
1896                          * ost_handler() initializes their pills, so we
1897                          * initialize that here.  Capsule initialization is
1898                          * idempotent, as is setting the pill's format (provided
1899                          * it doesn't change).
1900                          */
1901                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1902                         req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
1903
1904                         body = req_capsule_client_get(&req->rq_pill,
1905                                                       &RMF_OST_BODY);
1906                         if (body == NULL) {
1907                                 CERROR("Missing/short ost_body\n");
1908                                 RETURN(-EFAULT);
1909                         }
1910                         objcount = req_capsule_get_size(&req->rq_pill,
1911                                                         &RMF_OBD_IOOBJ,
1912                                                         RCL_CLIENT) /
1913                                                         sizeof(*ioo);
1914                         if (objcount == 0) {
1915                                 CERROR("Missing/short ioobj\n");
1916                                 RETURN(-EFAULT);
1917                         }
1918                         if (objcount > 1) {
1919                                 CERROR("too many ioobjs (%d)\n", objcount);
1920                                 RETURN(-EFAULT);
1921                         }
1922
1923                         ioo = req_capsule_client_get(&req->rq_pill,
1924                                                      &RMF_OBD_IOOBJ);
1925                         if (ioo == NULL) {
1926                                 CERROR("Missing/short ioobj\n");
1927                                 RETURN(-EFAULT);
1928                         }
1929
1930                         for (niocount = i = 0; i < objcount; i++) {
1931                                 if (ioo[i].ioo_bufcnt == 0) {
1932                                         CERROR("ioo[%d] has zero bufcnt\n", i);
1933                                         RETURN(-EFAULT);
1934                                 }
1935                                 niocount += ioo[i].ioo_bufcnt;
1936                         }
1937                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
1938                                 DEBUG_REQ(D_RPCTRACE, req,
1939                                           "bulk has too many pages (%d)",
1940                                           niocount);
1941                                 RETURN(-EFAULT);
1942                         }
1943
1944                         nb = req_capsule_client_get(&req->rq_pill,
1945                                                     &RMF_NIOBUF_REMOTE);
1946                         if (nb == NULL) {
1947                                 CERROR("Missing/short niobuf\n");
1948                                 RETURN(-EFAULT);
1949                         }
1950
1951                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
1952                                 req->rq_ops = &ost_hpreq_rw;
1953                 } else if (opc == OST_PUNCH) {
1954                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1955                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
1956
1957                         body = req_capsule_client_get(&req->rq_pill,
1958                                                       &RMF_OST_BODY);
1959                         if (body == NULL) {
1960                                 CERROR("Missing/short ost_body\n");
1961                                 RETURN(-EFAULT);
1962                         }
1963
1964                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1965                             !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
1966                                 req->rq_ops = &ost_hpreq_punch;
1967                 }
1968         }
1969         RETURN(0);
1970 }
1971
1972 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
1973 int ost_handle(struct ptlrpc_request *req)
1974 {
1975         struct obd_trans_info trans_info = { 0, };
1976         struct obd_trans_info *oti = &trans_info;
1977         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1978         struct obd_device *obd = NULL;
1979         ENTRY;
1980
1981         LASSERT(current->journal_info == NULL);
1982
1983         /* primordial rpcs don't affect server recovery */
1984         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1985         case SEC_CTX_INIT:
1986         case SEC_CTX_INIT_CONT:
1987         case SEC_CTX_FINI:
1988                 GOTO(out, rc = 0);
1989         }
1990
1991         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1992
1993         /* XXX identical to MDS */
1994         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1995                 int recovering;
1996
1997                 if (req->rq_export == NULL) {
1998                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1999                                lustre_msg_get_opc(req->rq_reqmsg),
2000                                libcfs_id2str(req->rq_peer));
2001                         req->rq_status = -ENOTCONN;
2002                         GOTO(out, rc = -ENOTCONN);
2003                 }
2004
2005                 obd = req->rq_export->exp_obd;
2006
2007                 /* Check for aborted recovery. */
2008                 spin_lock_bh(&obd->obd_processing_task_lock);
2009                 recovering = obd->obd_recovering;
2010                 spin_unlock_bh(&obd->obd_processing_task_lock);
2011                 if (recovering) {
2012                         rc = ost_filter_recovery_request(req, obd,
2013                                                          &should_process);
2014                         if (rc || !should_process)
2015                                 RETURN(rc);
2016                         else if (should_process < 0) {
2017                                 req->rq_status = should_process;
2018                                 rc = ptlrpc_error(req);
2019                                 RETURN(rc);
2020                         }
2021                 }
2022         }
2023
2024         oti_init(oti, req);
2025
2026         rc = ost_msg_check_version(req->rq_reqmsg);
2027         if (rc)
2028                 RETURN(rc);
2029
2030         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2031         case OST_CONNECT: {
2032                 CDEBUG(D_INODE, "connect\n");
2033                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2034                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2035                         RETURN(0);
2036                 rc = target_handle_connect(req);
2037                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2038                         RETURN(0);
2039                 if (!rc) {
2040                         rc = ost_init_sec_level(req);
2041                         if (!rc)
2042                                 rc = ost_connect_check_sptlrpc(req);
2043                 }
2044                 break;
2045         }
2046         case OST_DISCONNECT:
2047                 CDEBUG(D_INODE, "disconnect\n");
2048                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2049                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2050                         RETURN(0);
2051                 rc = target_handle_disconnect(req);
2052                 break;
2053         case OST_CREATE:
2054                 CDEBUG(D_INODE, "create\n");
2055                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2056                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2057                         RETURN(0);
2058                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2059                         GOTO(out, rc = -EROFS);
2060                 rc = ost_create(req->rq_export, req, oti);
2061                 break;
2062         case OST_DESTROY:
2063                 CDEBUG(D_INODE, "destroy\n");
2064                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2065                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2066                         RETURN(0);
2067                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2068                         GOTO(out, rc = -EROFS);
2069                 rc = ost_destroy(req->rq_export, req, oti);
2070                 break;
2071         case OST_GETATTR:
2072                 CDEBUG(D_INODE, "getattr\n");
2073                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2074                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2075                         RETURN(0);
2076                 rc = ost_getattr(req->rq_export, req);
2077                 break;
2078         case OST_SETATTR:
2079                 CDEBUG(D_INODE, "setattr\n");
2080                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2081                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2082                         RETURN(0);
2083                 rc = ost_setattr(req->rq_export, req, oti);
2084                 break;
2085         case OST_WRITE:
2086                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
2087                 CDEBUG(D_INODE, "write\n");
2088                 /* req->rq_request_portal would be nice, if it was set */
2089                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2090                         CERROR("%s: deny write request from %s to portal %u\n",
2091                                req->rq_export->exp_obd->obd_name,
2092                                obd_export_nid2str(req->rq_export),
2093                                req->rq_rqbd->rqbd_service->srv_req_portal);
2094                         GOTO(out, rc = -EPROTO);
2095                 }
2096                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2097                         RETURN(0);
2098                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2099                         GOTO(out, rc = -ENOSPC);
2100                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2101                         GOTO(out, rc = -EROFS);
2102                 rc = ost_brw_write(req, oti);
2103                 LASSERT(current->journal_info == NULL);
2104                 /* ost_brw_write sends its own replies */
2105                 RETURN(rc);
2106         case OST_READ:
2107                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
2108                 CDEBUG(D_INODE, "read\n");
2109                 /* req->rq_request_portal would be nice, if it was set */
2110                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2111                         CERROR("%s: deny read request from %s to portal %u\n",
2112                                req->rq_export->exp_obd->obd_name,
2113                                obd_export_nid2str(req->rq_export),
2114                                req->rq_rqbd->rqbd_service->srv_req_portal);
2115                         GOTO(out, rc = -EPROTO);
2116                 }
2117                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2118                         RETURN(0);
2119                 rc = ost_brw_read(req, oti);
2120                 LASSERT(current->journal_info == NULL);
2121                 /* ost_brw_read sends its own replies */
2122                 RETURN(rc);
2123         case OST_PUNCH:
2124                 CDEBUG(D_INODE, "punch\n");
2125                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2126                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2127                         RETURN(0);
2128                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2129                         GOTO(out, rc = -EROFS);
2130                 rc = ost_punch(req->rq_export, req, oti);
2131                 break;
2132         case OST_STATFS:
2133                 CDEBUG(D_INODE, "statfs\n");
2134                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2135                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2136                         RETURN(0);
2137                 rc = ost_statfs(req);
2138                 break;
2139         case OST_SYNC:
2140                 CDEBUG(D_INODE, "sync\n");
2141                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2142                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2143                         RETURN(0);
2144                 rc = ost_sync(req->rq_export, req);
2145                 break;
2146         case OST_SET_INFO:
2147                 DEBUG_REQ(D_INODE, req, "set_info");
2148                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2149                 rc = ost_set_info(req->rq_export, req);
2150                 break;
2151         case OST_GET_INFO:
2152                 DEBUG_REQ(D_INODE, req, "get_info");
2153                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2154                 rc = ost_get_info(req->rq_export, req);
2155                 break;
2156 #ifdef HAVE_QUOTA_SUPPORT
2157         case OST_QUOTACHECK:
2158                 CDEBUG(D_INODE, "quotacheck\n");
2159                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2160                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2161                         RETURN(0);
2162                 rc = ost_handle_quotacheck(req);
2163                 break;
2164         case OST_QUOTACTL:
2165                 CDEBUG(D_INODE, "quotactl\n");
2166                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2167                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2168                         RETURN(0);
2169                 rc = ost_handle_quotactl(req);
2170                 break;
2171         case OST_QUOTA_ADJUST_QUNIT:
2172                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
2173                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
2174                 rc = ost_handle_quota_adjust_qunit(req);
2175                 break;
2176 #endif
2177         case OBD_PING:
2178                 DEBUG_REQ(D_INODE, req, "ping");
2179                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2180                 rc = target_handle_ping(req);
2181                 break;
2182         /* FIXME - just reply status */
2183         case LLOG_ORIGIN_CONNECT:
2184                 DEBUG_REQ(D_INODE, req, "log connect");
2185                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2186                 rc = ost_llog_handle_connect(req->rq_export, req);
2187                 req->rq_status = rc;
2188                 rc = req_capsule_server_pack(&req->rq_pill);
2189                 if (rc)
2190                         RETURN(rc);
2191                 RETURN(ptlrpc_reply(req));
2192         case OBD_LOG_CANCEL:
2193                 CDEBUG(D_INODE, "log cancel\n");
2194                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2195                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2196                         RETURN(0);
2197                 rc = llog_origin_handle_cancel(req);
2198                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2199                         RETURN(0);
2200                 req->rq_status = rc;
2201                 rc = req_capsule_server_pack(&req->rq_pill);
2202                 if (rc)
2203                         RETURN(rc);
2204                 RETURN(ptlrpc_reply(req));
2205         case LDLM_ENQUEUE:
2206                 CDEBUG(D_INODE, "enqueue\n");
2207                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2208                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
2209                         RETURN(0);
2210                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2211                                          ldlm_server_blocking_ast,
2212                                          ldlm_server_glimpse_ast);
2213                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2214                 break;
2215         case LDLM_CONVERT:
2216                 CDEBUG(D_INODE, "convert\n");
2217                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2218                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
2219                         RETURN(0);
2220                 rc = ldlm_handle_convert(req);
2221                 break;
2222         case LDLM_CANCEL:
2223                 CDEBUG(D_INODE, "cancel\n");
2224                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2225                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2226                         RETURN(0);
2227                 rc = ldlm_handle_cancel(req);
2228                 break;
2229         case LDLM_BL_CALLBACK:
2230         case LDLM_CP_CALLBACK:
2231                 CDEBUG(D_INODE, "callback\n");
2232                 CERROR("callbacks should not happen on OST\n");
2233                 /* fall through */
2234         default:
2235                 CERROR("Unexpected opcode %d\n",
2236                        lustre_msg_get_opc(req->rq_reqmsg));
2237                 req->rq_status = -ENOTSUPP;
2238                 rc = ptlrpc_error(req);
2239                 RETURN(rc);
2240         }
2241
2242         LASSERT(current->journal_info == NULL);
2243
2244         EXIT;
2245         /* If we're DISCONNECTing, the export_data is already freed */
2246         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2247                 target_committed_to_req(req);
2248
2249 out:
2250         if (!rc)
2251                 oti_to_request(oti, req);
2252
2253         target_send_reply(req, rc, fail);
2254         return 0;
2255 }
2256 EXPORT_SYMBOL(ost_handle);
2257 /*
2258  * free per-thread pool created by ost_thread_init().
2259  */
2260 static void ost_thread_done(struct ptlrpc_thread *thread)
2261 {
2262         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2263                                              * Storage */
2264
2265         ENTRY;
2266
2267         LASSERT(thread != NULL);
2268
2269         /*
2270          * be prepared to handle partially-initialized pools (because this is
2271          * called from ost_thread_init() for cleanup.
2272          */
2273         tls = thread->t_data;
2274         if (tls != NULL) {
2275                 OBD_FREE_PTR(tls);
2276                 thread->t_data = NULL;
2277         }
2278         EXIT;
2279 }
2280
2281 /*
2282  * initialize per-thread page pool (bug 5137).
2283  */
2284 static int ost_thread_init(struct ptlrpc_thread *thread)
2285 {
2286         struct ost_thread_local_cache *tls;
2287
2288         ENTRY;
2289
2290         LASSERT(thread != NULL);
2291         LASSERT(thread->t_data == NULL);
2292         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2293
2294         OBD_ALLOC_PTR(tls);
2295         if (tls == NULL)
2296                 RETURN(-ENOMEM);
2297         thread->t_data = tls;
2298         RETURN(0);
2299 }
2300
2301 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2302
2303 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2304 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2305 {
2306         struct ost_obd *ost = &obd->u.ost;
2307         struct lprocfs_static_vars lvars;
2308         int oss_min_threads;
2309         int oss_max_threads;
2310         int oss_min_create_threads;
2311         int oss_max_create_threads;
2312         int rc;
2313         ENTRY;
2314
2315         rc = cleanup_group_info();
2316         if (rc)
2317                 RETURN(rc);
2318
2319         lprocfs_ost_init_vars(&lvars);
2320         lprocfs_obd_setup(obd, lvars.obd_vars);
2321
2322         sema_init(&ost->ost_health_sem, 1);
2323
2324         if (oss_num_threads) {
2325                 /* If oss_num_threads is set, it is the min and the max. */
2326                 if (oss_num_threads > OSS_THREADS_MAX)
2327                         oss_num_threads = OSS_THREADS_MAX;
2328                 if (oss_num_threads < OSS_THREADS_MIN)
2329                         oss_num_threads = OSS_THREADS_MIN;
2330                 oss_max_threads = oss_min_threads = oss_num_threads;
2331         } else {
2332                 /* Base min threads on memory and cpus */
2333                 oss_min_threads = num_possible_cpus() * CFS_NUM_CACHEPAGES >>
2334                         (27 - CFS_PAGE_SHIFT);
2335                 if (oss_min_threads < OSS_THREADS_MIN)
2336                         oss_min_threads = OSS_THREADS_MIN;
2337                 /* Insure a 4x range for dynamic threads */
2338                 if (oss_min_threads > OSS_THREADS_MAX / 4)
2339                         oss_min_threads = OSS_THREADS_MAX / 4;
2340                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2341         }
2342
2343         ost->ost_service =
2344                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2345                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
2346                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2347                                 ost_handle, LUSTRE_OSS_NAME,
2348                                 obd->obd_proc_entry, target_print_req,
2349                                 oss_min_threads, oss_max_threads,
2350                                 "ll_ost", LCT_DT_THREAD, NULL);
2351         if (ost->ost_service == NULL) {
2352                 CERROR("failed to start service\n");
2353                 GOTO(out_lprocfs, rc = -ENOMEM);
2354         }
2355
2356         rc = ptlrpc_start_threads(obd, ost->ost_service);
2357         if (rc)
2358                 GOTO(out_service, rc = -EINVAL);
2359
2360         if (oss_num_create_threads) {
2361                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2362                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2363                 if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
2364                         oss_num_create_threads = OSS_MIN_CREATE_THREADS;
2365                 oss_min_create_threads = oss_max_create_threads =
2366                         oss_num_create_threads;
2367         } else {
2368                 oss_min_create_threads = OSS_MIN_CREATE_THREADS;
2369                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2370         }
2371
2372         ost->ost_create_service =
2373                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2374                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2375                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2376                                 ost_handle, "ost_create",
2377                                 obd->obd_proc_entry, target_print_req,
2378                                 oss_min_create_threads, oss_max_create_threads,
2379                                 "ll_ost_creat", LCT_DT_THREAD, NULL);
2380         if (ost->ost_create_service == NULL) {
2381                 CERROR("failed to start OST create service\n");
2382                 GOTO(out_service, rc = -ENOMEM);
2383         }
2384
2385         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
2386         if (rc)
2387                 GOTO(out_create, rc = -EINVAL);
2388
2389         ost->ost_io_service =
2390                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2391                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2392                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2393                                 ost_handle, "ost_io",
2394                                 obd->obd_proc_entry, target_print_req,
2395                                 oss_min_threads, oss_max_threads,
2396                                 "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
2397         if (ost->ost_io_service == NULL) {
2398                 CERROR("failed to start OST I/O service\n");
2399                 GOTO(out_create, rc = -ENOMEM);
2400         }
2401
2402         ost->ost_io_service->srv_init = ost_thread_init;
2403         ost->ost_io_service->srv_done = ost_thread_done;
2404         ost->ost_io_service->srv_cpu_affinity = 1;
2405         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
2406         if (rc)
2407                 GOTO(out_io, rc = -EINVAL);
2408
2409         ping_evictor_start();
2410
2411         RETURN(0);
2412
2413 out_io:
2414         ptlrpc_unregister_service(ost->ost_io_service);
2415         ost->ost_io_service = NULL;
2416 out_create:
2417         ptlrpc_unregister_service(ost->ost_create_service);
2418         ost->ost_create_service = NULL;
2419 out_service:
2420         ptlrpc_unregister_service(ost->ost_service);
2421         ost->ost_service = NULL;
2422 out_lprocfs:
2423         lprocfs_obd_cleanup(obd);
2424         RETURN(rc);
2425 }
2426
2427 static int ost_cleanup(struct obd_device *obd)
2428 {
2429         struct ost_obd *ost = &obd->u.ost;
2430         int err = 0;
2431         ENTRY;
2432
2433         ping_evictor_stop();
2434
2435         spin_lock_bh(&obd->obd_processing_task_lock);
2436         if (obd->obd_recovering) {
2437                 target_cancel_recovery_timer(obd);
2438                 obd->obd_recovering = 0;
2439         }
2440         spin_unlock_bh(&obd->obd_processing_task_lock);
2441
2442         down(&ost->ost_health_sem);
2443         ptlrpc_unregister_service(ost->ost_service);
2444         ptlrpc_unregister_service(ost->ost_create_service);
2445         ptlrpc_unregister_service(ost->ost_io_service);
2446         ost->ost_service = NULL;
2447         ost->ost_create_service = NULL;
2448         up(&ost->ost_health_sem);
2449
2450         lprocfs_obd_cleanup(obd);
2451
2452         RETURN(err);
2453 }
2454
2455 static int ost_health_check(struct obd_device *obd)
2456 {
2457         struct ost_obd *ost = &obd->u.ost;
2458         int rc = 0;
2459
2460         down(&ost->ost_health_sem);
2461         rc |= ptlrpc_service_health_check(ost->ost_service);
2462         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2463         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2464         up(&ost->ost_health_sem);
2465
2466         /*
2467          * health_check to return 0 on healthy
2468          * and 1 on unhealthy.
2469          */
2470         if( rc != 0)
2471                 rc = 1;
2472
2473         return rc;
2474 }
2475
2476 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2477 {
2478         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2479 }
2480
2481 /* use obd ops to offer management infrastructure */
2482 static struct obd_ops ost_obd_ops = {
2483         .o_owner        = THIS_MODULE,
2484         .o_setup        = ost_setup,
2485         .o_cleanup      = ost_cleanup,
2486         .o_health_check = ost_health_check,
2487 };
2488
2489
2490 static int __init ost_init(void)
2491 {
2492         struct lprocfs_static_vars lvars;
2493         int rc;
2494         ENTRY;
2495
2496         lprocfs_ost_init_vars(&lvars);
2497         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2498                                  LUSTRE_OSS_NAME, NULL);
2499
2500         if (ost_num_threads != 0 && oss_num_threads == 0) {
2501                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2502                               "use oss_num_threads instead or unset both for "
2503                               "dynamic thread startup\n");
2504                 oss_num_threads = ost_num_threads;
2505         }
2506
2507         RETURN(rc);
2508 }
2509
2510 static void /*__exit*/ ost_exit(void)
2511 {
2512         class_unregister_type(LUSTRE_OSS_NAME);
2513 }
2514
2515 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2516 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2517 MODULE_LICENSE("GPL");
2518
2519 module_init(ost_init);
2520 module_exit(ost_exit);