Whamcloud - gitweb
LU-2684 fid: unify ostid and FID
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OST
43
44 #include <linux/module.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <lustre_net.h>
48 #include <lustre_dlm.h>
49 #include <lustre_export.h>
50 #include <lustre_debug.h>
51 #include <lustre_fid.h>
52 #include <lustre_fld.h>
53 #include <linux/init.h>
54 #include <lprocfs_status.h>
55 #include <libcfs/list.h>
56 #include <lustre_quota.h>
57 #include <lustre_fid.h>
58 #include "ost_internal.h"
59 #include <lustre_fid.h>
60
61 static int oss_num_threads;
62 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
63                 "number of OSS service threads to start");
64
65 static int ost_num_threads;
66 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
67                 "number of OST service threads to start (deprecated)");
68
69 static int oss_num_create_threads;
70 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
71                 "number of OSS create threads to start");
72
73 static char *oss_cpts;
74 CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
75                 "CPU partitions OSS threads should run on");
76
77 static char *oss_io_cpts;
78 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
79                 "CPU partitions OSS IO threads should run on");
80
81 /*
82  * this page is allocated statically when module is initializing
83  * it is used to simulate data corruptions, see ost_checksum_bulk()
84  * for details. as the original pages provided by the layers below
85  * can be remain in the internal cache, we do not want to modify
86  * them.
87  */
88 static struct page *ost_page_to_corrupt = NULL;
89
90 /**
91  * Do not return server-side uid/gid to remote client
92  */
93 static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
94 {
95         if (exp_connect_rmtclient(exp)) {
96                 oa->o_uid = -1;
97                 oa->o_gid = -1;
98                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
99         }
100 }
101
102 /**
103  * Validate oa from client.
104  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
105  * req are valid.
106  *    a. for single MDS  seq = FID_SEQ_OST_MDT0,
107  *    b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
108  */
109 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
110                              struct obd_ioobj *ioobj)
111 {
112         if (unlikely(oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP))) {
113                 ostid_set_seq_mdt0(&oa->o_oi);
114                 if (ioobj)
115                         ostid_set_seq_mdt0(&ioobj->ioo_oid);
116         } else if (unlikely(oa == NULL ||
117                             !(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
118                               fid_seq_is_mdt(ostid_seq(&oa->o_oi)) ||
119                               fid_seq_is_echo(ostid_seq(&oa->o_oi))))) {
120                 CERROR("%s: client %s sent bad object "DOSTID": rc = -EPROTO\n",
121                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
122                        oa ? ostid_seq(&oa->o_oi) : -1,
123                        oa ? ostid_id(&oa->o_oi) : -1);
124                 return -EPROTO;
125         }
126
127         if (ioobj != NULL) {
128                 unsigned max_brw = ioobj_max_brw_get(ioobj);
129
130                 if (unlikely((max_brw & (max_brw - 1)) != 0)) {
131                         CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
132                                ": rc = -EPROTO\n", exp->exp_obd->obd_name,
133                                obd_export_nid2str(exp), max_brw,
134                                POSTID(&oa->o_oi));
135                         return -EPROTO;
136                 }
137                 ioobj->ioo_oid = oa->o_oi;
138         }
139         return 0;
140 }
141
142 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
143 {
144         struct oti_req_ack_lock *ack_lock;
145         int i;
146
147         if (oti == NULL)
148                 return;
149
150         if (req->rq_repmsg) {
151                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
152                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
153                 versions[0] = oti->oti_pre_version;
154                 lustre_msg_set_versions(req->rq_repmsg, versions);
155         }
156         req->rq_transno = oti->oti_transno;
157
158         /* XXX 4 == entries in oti_ack_locks??? */
159         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
160                 if (!ack_lock->mode)
161                         break;
162                 /* XXX not even calling target_send_reply in some cases... */
163                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
164         }
165 }
166
167 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
168                        struct obd_trans_info *oti)
169 {
170         struct ost_body *body, *repbody;
171         struct lustre_capa *capa = NULL;
172         int rc;
173         ENTRY;
174
175         /* Get the request body */
176         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
177         if (body == NULL)
178                 RETURN(-EFAULT);
179
180         if (ostid_id(&body->oa.o_oi) == 0)
181                 RETURN(-EPROTO);
182
183         rc = ost_validate_obdo(exp, &body->oa, NULL);
184         if (rc)
185                 RETURN(rc);
186
187         /* If there's a DLM request, cancel the locks mentioned in it*/
188         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
189                 struct ldlm_request *dlm;
190
191                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
192                 if (dlm == NULL)
193                         RETURN (-EFAULT);
194                 ldlm_request_cancel(req, dlm, 0);
195         }
196
197         /* If there's a capability, get it */
198         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
199                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
200                 if (capa == NULL) {
201                         CERROR("Missing capability for OST DESTROY");
202                         RETURN (-EFAULT);
203                 }
204         }
205
206         /* Prepare the reply */
207         rc = req_capsule_server_pack(&req->rq_pill);
208         if (rc)
209                 RETURN(rc);
210
211         /* Get the log cancellation cookie */
212         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
213                 oti->oti_logcookies = &body->oa.o_lcookie;
214
215         /* Finish the reply */
216         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
217         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
218
219         /* Do the destroy and set the reply status accordingly  */
220         req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
221                                      &repbody->oa, NULL, oti, NULL, capa);
222         RETURN(0);
223 }
224
225 /**
226  * Helper function for getting server side [start, start+count] DLM lock
227  * if asked by client.
228  */
229 static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
230                         __u64 start, __u64 count, struct lustre_handle *lh,
231                         int mode, __u64 flags)
232 {
233         struct ldlm_res_id res_id;
234         ldlm_policy_data_t policy;
235         __u64 end = start + count;
236
237         ENTRY;
238
239         LASSERT(!lustre_handle_is_used(lh));
240         /* o_id and o_gr are used for localizing resource, if client miss to set
241          * them, do not trigger ASSERTION. */
242         if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
243                      (OBD_MD_FLID | OBD_MD_FLGROUP)))
244                 RETURN(-EPROTO);
245
246         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
247             !(oa->o_flags & OBD_FL_SRVLOCK))
248                 RETURN(0);
249
250         ostid_build_res_name(&oa->o_oi, &res_id);
251         CDEBUG(D_INODE, "OST-side extent lock.\n");
252
253         policy.l_extent.start = start & CFS_PAGE_MASK;
254
255         /* If ->o_blocks is EOF it means "lock till the end of the
256          * file". Otherwise, it's size of a hole being punched (in bytes) */
257         if (count == OBD_OBJECT_EOF || end < start)
258                 policy.l_extent.end = OBD_OBJECT_EOF;
259         else
260                 policy.l_extent.end = end | ~CFS_PAGE_MASK;
261
262         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
263                                       LDLM_EXTENT, &policy, mode, &flags,
264                                       ldlm_blocking_ast, ldlm_completion_ast,
265                                       ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
266                                       NULL, lh));
267 }
268
269 /* Helper function: release lock, if any. */
270 static void ost_lock_put(struct obd_export *exp,
271                          struct lustre_handle *lh, int mode)
272 {
273         ENTRY;
274         if (lustre_handle_is_used(lh))
275                 ldlm_lock_decref(lh, mode);
276         EXIT;
277 }
278
279 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
280 {
281         struct ost_body *body, *repbody;
282         struct obd_info *oinfo;
283         struct lustre_handle lh = { 0 };
284         struct lustre_capa *capa = NULL;
285         int rc;
286         ENTRY;
287
288         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
289         if (body == NULL)
290                 RETURN(-EFAULT);
291
292         rc = ost_validate_obdo(exp, &body->oa, NULL);
293         if (rc)
294                 RETURN(rc);
295
296         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
297                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
298                 if (capa == NULL) {
299                         CERROR("Missing capability for OST GETATTR");
300                         RETURN(-EFAULT);
301                 }
302         }
303
304         rc = req_capsule_server_pack(&req->rq_pill);
305         if (rc)
306                 RETURN(rc);
307
308         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
309         repbody->oa = body->oa;
310
311         rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
312         if (rc)
313                 RETURN(rc);
314
315         OBD_ALLOC_PTR(oinfo);
316         if (!oinfo)
317                 GOTO(unlock, rc = -ENOMEM);
318         oinfo->oi_oa = &repbody->oa;
319         oinfo->oi_capa = capa;
320
321         req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
322
323         OBD_FREE_PTR(oinfo);
324
325         ost_drop_id(exp, &repbody->oa);
326
327 unlock:
328         ost_lock_put(exp, &lh, LCK_PR);
329         RETURN(rc);
330 }
331
332 static int ost_statfs(struct ptlrpc_request *req)
333 {
334         struct obd_statfs *osfs;
335         int rc;
336         ENTRY;
337
338         rc = req_capsule_server_pack(&req->rq_pill);
339         if (rc)
340                 RETURN(rc);
341
342         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
343
344         req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
345                                     osfs,
346                                     cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
347                                     0);
348         if (req->rq_status != 0)
349                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
350
351         if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
352                 req->rq_status = -EINPROGRESS;
353
354         RETURN(0);
355 }
356
357 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
358                       struct obd_trans_info *oti)
359 {
360         struct ost_body *body, *repbody;
361         int rc;
362         ENTRY;
363
364         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 RETURN(-EFAULT);
367
368         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
369         if (rc)
370                 RETURN(rc);
371
372         rc = req_capsule_server_pack(&req->rq_pill);
373         if (rc)
374                 RETURN(rc);
375
376         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
377         repbody->oa = body->oa;
378         oti->oti_logcookies = &body->oa.o_lcookie;
379
380         req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
381                                     &repbody->oa, NULL, oti);
382         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
383         RETURN(0);
384 }
385
386 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
387                      struct obd_trans_info *oti)
388 {
389         struct ost_body *body, *repbody;
390         __u64 flags = 0;
391         struct lustre_handle lh = {0,};
392         int rc;
393         ENTRY;
394
395         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
396         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
397
398         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
399         if (body == NULL)
400                 RETURN(-EFAULT);
401
402         rc = ost_validate_obdo(exp, &body->oa, NULL);
403         if (rc)
404                 RETURN(rc);
405
406         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
407             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
408                 RETURN(-EPROTO);
409
410         rc = req_capsule_server_pack(&req->rq_pill);
411         if (rc)
412                 RETURN(rc);
413
414         /* standard truncate optimization: if file body is completely
415          * destroyed, don't send data back to the server. */
416         if (body->oa.o_size == 0)
417                 flags |= LDLM_AST_DISCARD_DATA;
418
419         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
420         repbody->oa = body->oa;
421
422         rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
423                           repbody->oa.o_blocks, &lh, LCK_PW, flags);
424         if (rc == 0) {
425                 struct obd_info *oinfo;
426                 struct lustre_capa *capa = NULL;
427
428                 if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
429                     repbody->oa.o_flags == OBD_FL_SRVLOCK)
430                         /*
431                          * If OBD_FL_SRVLOCK is the only bit set in
432                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
433                          * through filter_setattr() to filter_iocontrol().
434                          */
435                         repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
436
437                 if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
438                         capa = req_capsule_client_get(&req->rq_pill,
439                                                       &RMF_CAPA1);
440                         if (capa == NULL) {
441                                 CERROR("Missing capability for OST PUNCH");
442                                 GOTO(unlock, rc = -EFAULT);
443                         }
444                 }
445
446                 OBD_ALLOC_PTR(oinfo);
447                 if (!oinfo)
448                         GOTO(unlock, rc = -ENOMEM);
449                 oinfo->oi_oa = &repbody->oa;
450                 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
451                 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
452                 oinfo->oi_capa = capa;
453                 oinfo->oi_flags = OBD_FL_PUNCH;
454
455                 req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
456                                            oinfo, oti, NULL);
457                 OBD_FREE_PTR(oinfo);
458 unlock:
459                 ost_lock_put(exp, &lh, LCK_PW);
460         }
461
462         ost_drop_id(exp, &repbody->oa);
463         RETURN(rc);
464 }
465
466 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
467                     struct obd_trans_info *oti)
468 {
469         struct ost_body *body, *repbody;
470         struct obd_info *oinfo;
471         struct lustre_capa *capa = NULL;
472         int rc;
473         ENTRY;
474
475         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
476         if (body == NULL)
477                 RETURN(-EFAULT);
478
479         rc = ost_validate_obdo(exp, &body->oa, NULL);
480         if (rc)
481                 RETURN(rc);
482
483         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
484                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
485                 if (capa == NULL) {
486                         CERROR("Missing capability for OST SYNC");
487                         RETURN (-EFAULT);
488                 }
489         }
490
491         rc = req_capsule_server_pack(&req->rq_pill);
492         if (rc)
493                 RETURN(rc);
494
495         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
496         repbody->oa = body->oa;
497
498         OBD_ALLOC_PTR(oinfo);
499         if (!oinfo)
500                 RETURN(-ENOMEM);
501
502         oinfo->oi_oa = &repbody->oa;
503         oinfo->oi_capa = capa;
504         oinfo->oi_jobid = oti->oti_jobid;
505         req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
506                                   repbody->oa.o_size, repbody->oa.o_blocks,
507                                   NULL);
508         OBD_FREE_PTR(oinfo);
509
510         ost_drop_id(exp, &repbody->oa);
511         RETURN(0);
512 }
513
514 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
515                        struct obd_trans_info *oti)
516 {
517         struct ost_body *body, *repbody;
518         struct obd_info *oinfo;
519         struct lustre_capa *capa = NULL;
520         int rc;
521         ENTRY;
522
523         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
524         if (body == NULL)
525                 RETURN(-EFAULT);
526
527         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
528         if (rc)
529                 RETURN(rc);
530
531         rc = req_capsule_server_pack(&req->rq_pill);
532         if (rc)
533                 RETURN(rc);
534
535         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
536                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
537                 if (capa == NULL) {
538                         CERROR("Missing capability for OST SETATTR");
539                         RETURN (-EFAULT);
540                 }
541         }
542
543         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
544         repbody->oa = body->oa;
545
546         OBD_ALLOC_PTR(oinfo);
547         if (!oinfo)
548                 RETURN(-ENOMEM);
549         oinfo->oi_oa = &repbody->oa;
550         oinfo->oi_capa = capa;
551
552         req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
553                                      oti);
554
555         OBD_FREE_PTR(oinfo);
556
557         ost_drop_id(exp, &repbody->oa);
558         RETURN(0);
559 }
560
561 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
562                                cksum_type_t cksum_type)
563 {
564         struct cfs_crypto_hash_desc     *hdesc;
565         unsigned int                    bufsize;
566         int                             i, err;
567         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
568         __u32                           cksum;
569
570         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
571         if (IS_ERR(hdesc)) {
572                 CERROR("Unable to initialize checksum hash %s\n",
573                        cfs_crypto_hash_name(cfs_alg));
574                 return PTR_ERR(hdesc);
575         }
576         CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
577         for (i = 0; i < desc->bd_iov_count; i++) {
578
579                 /* corrupt the data before we compute the checksum, to
580                  * simulate a client->OST data error */
581                 if (i == 0 && opc == OST_WRITE &&
582                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
583                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
584                         int len = desc->bd_iov[i].kiov_len;
585                         struct page *np = ost_page_to_corrupt;
586                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
587
588                         if (np) {
589                                 char *ptr2 = kmap(np) + off;
590
591                                 memcpy(ptr2, ptr, len);
592                                 memcpy(ptr2, "bad3", min(4, len));
593                                 kunmap(np);
594                                 desc->bd_iov[i].kiov_page = np;
595                         } else {
596                                 CERROR("can't alloc page for corruption\n");
597                         }
598                 }
599                 cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
600                                   desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
601                                   desc->bd_iov[i].kiov_len);
602
603                  /* corrupt the data after we compute the checksum, to
604                  * simulate an OST->client data error */
605                 if (i == 0 && opc == OST_READ &&
606                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
607                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
608                         int len = desc->bd_iov[i].kiov_len;
609                         struct page *np = ost_page_to_corrupt;
610                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
611
612                         if (np) {
613                                 char *ptr2 = kmap(np) + off;
614
615                                 memcpy(ptr2, ptr, len);
616                                 memcpy(ptr2, "bad4", min(4, len));
617                                 kunmap(np);
618                                 desc->bd_iov[i].kiov_page = np;
619                         } else {
620                                 CERROR("can't alloc page for corruption\n");
621                         }
622                 }
623         }
624
625         bufsize = 4;
626         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
627         if (err)
628                 cfs_crypto_hash_final(hdesc, NULL, NULL);
629
630         return cksum;
631 }
632
633 static int ost_brw_lock_get(int mode, struct obd_export *exp,
634                             struct obd_ioobj *obj, struct niobuf_remote *nb,
635                             struct lustre_handle *lh)
636 {
637         __u64 flags               = 0;
638         int nrbufs                = obj->ioo_bufcnt;
639         struct ldlm_res_id res_id;
640         ldlm_policy_data_t policy;
641         int i;
642         ENTRY;
643
644         ostid_build_res_name(&obj->ioo_oid, &res_id);
645         LASSERT(mode == LCK_PR || mode == LCK_PW);
646         LASSERT(!lustre_handle_is_used(lh));
647
648         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
649                 RETURN(0);
650
651         for (i = 1; i < nrbufs; i ++)
652                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
653                     (nb[i].flags & OBD_BRW_SRVLOCK))
654                         RETURN(-EFAULT);
655
656         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
657         policy.l_extent.end   = (nb[nrbufs - 1].offset +
658                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
659
660         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
661                                       LDLM_EXTENT, &policy, mode, &flags,
662                                       ldlm_blocking_ast, ldlm_completion_ast,
663                                       ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
664                                       NULL, lh));
665 }
666
667 static void ost_brw_lock_put(int mode,
668                              struct obd_ioobj *obj, struct niobuf_remote *niob,
669                              struct lustre_handle *lh)
670 {
671         ENTRY;
672         LASSERT(mode == LCK_PR || mode == LCK_PW);
673         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
674                 lustre_handle_is_used(lh));
675         if (lustre_handle_is_used(lh))
676                 ldlm_lock_decref(lh, mode);
677         EXIT;
678 }
679
680 /* Allocate thread local buffers if needed */
681 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
682 {
683         struct ost_thread_local_cache *tls =
684                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
685
686         /* In normal mode of operation an I/O request is serviced only
687          * by ll_ost_io threads each of them has own tls buffers allocated by
688          * ost_io_thread_init().
689          * During recovery, an I/O request may be queued until any of the ost
690          * service threads process it. Not necessary it should be one of
691          * ll_ost_io threads. In that case we dynamically allocating tls
692          * buffers for the request service time. */
693         if (unlikely(tls == NULL)) {
694                 LASSERT(r->rq_export->exp_in_recovery);
695                 OBD_ALLOC_PTR(tls);
696                 if (tls != NULL) {
697                         tls->temporary = 1;
698                         r->rq_svc_thread->t_data = tls;
699                 }
700         }
701         return  tls;
702 }
703
704 /* Free thread local buffers if they were allocated only for servicing
705  * this one request */
706 static void ost_tls_put(struct ptlrpc_request *r)
707 {
708         struct ost_thread_local_cache *tls =
709                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
710
711         if (unlikely(tls->temporary)) {
712                 OBD_FREE_PTR(tls);
713                 r->rq_svc_thread->t_data = NULL;
714         }
715 }
716
717 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
718 {
719         struct ptlrpc_bulk_desc *desc = NULL;
720         struct obd_export *exp = req->rq_export;
721         struct niobuf_remote *remote_nb;
722         struct niobuf_local *local_nb;
723         struct obd_ioobj *ioo;
724         struct ost_body *body, *repbody;
725         struct lustre_capa *capa = NULL;
726         struct l_wait_info lwi;
727         struct lustre_handle lockh = { 0 };
728         int niocount, npages, nob = 0, rc, i;
729         int no_reply = 0;
730         struct ost_thread_local_cache *tls;
731         ENTRY;
732
733         req->rq_bulk_read = 1;
734
735         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
736                 GOTO(out, rc = -EIO);
737
738         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
739
740         /* Check if there is eviction in progress, and if so, wait for it to
741          * finish */
742         if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
743                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
744                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
745                         !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
746                         &lwi);
747         }
748         if (exp->exp_failed)
749                 GOTO(out, rc = -ENOTCONN);
750
751         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
752          * ost_rw_hpreq_check(). */
753         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
754         if (body == NULL)
755                 GOTO(out, rc = -EFAULT);
756
757         /*
758          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
759          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
760          * &RMF_NIOBUF_REMOTE.
761          */
762         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
763         if (ioo == NULL)
764                 GOTO(out, rc = -EFAULT);
765
766         rc = ost_validate_obdo(exp, &body->oa, ioo);
767         if (rc)
768                 RETURN(rc);
769
770         niocount = ioo->ioo_bufcnt;
771         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
772         if (remote_nb == NULL)
773                 GOTO(out, rc = -EFAULT);
774
775         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
776                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
777                 if (capa == NULL) {
778                         CERROR("Missing capability for OST BRW READ");
779                         GOTO(out, rc = -EFAULT);
780                 }
781         }
782
783         rc = req_capsule_server_pack(&req->rq_pill);
784         if (rc)
785                 GOTO(out, rc);
786
787         tls = ost_tls_get(req);
788         if (tls == NULL)
789                 GOTO(out_bulk, rc = -ENOMEM);
790         local_nb = tls->local;
791
792         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
793         if (rc != 0)
794                 GOTO(out_tls, rc);
795
796         /*
797          * If getting the lock took more time than
798          * client was willing to wait, drop it. b=11330
799          */
800         if (cfs_time_current_sec() > req->rq_deadline ||
801             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
802                 no_reply = 1;
803                 CERROR("Dropping timed-out read from %s because locking"
804                        "object "DOSTID" took %ld seconds (limit was %ld).\n",
805                        libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
806                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
807                        req->rq_deadline - req->rq_arrival_time.tv_sec);
808                 GOTO(out_lock, rc = -ETIMEDOUT);
809         }
810
811         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
812         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
813
814         npages = OST_THREAD_POOL_SIZE;
815         rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
816                         &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
817                         oti, capa);
818         if (rc != 0)
819                 GOTO(out_lock, rc);
820
821         desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
822                                     BULK_PUT_SOURCE, OST_BULK_PORTAL);
823         if (desc == NULL)
824                 GOTO(out_commitrw, rc = -ENOMEM);
825
826         nob = 0;
827         for (i = 0; i < npages; i++) {
828                 int page_rc = local_nb[i].rc;
829
830                 if (page_rc < 0) {              /* error */
831                         rc = page_rc;
832                         break;
833                 }
834
835                 nob += page_rc;
836                 if (page_rc != 0) {             /* some data! */
837                         LASSERT (local_nb[i].page != NULL);
838                         ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
839                                                     local_nb[i].lnb_page_offset,
840                                                     page_rc);
841                 }
842
843                 if (page_rc != local_nb[i].len) { /* short read */
844                         /* All subsequent pages should be 0 */
845                         while(++i < npages)
846                                 LASSERT(local_nb[i].rc == 0);
847                         break;
848                 }
849         }
850
851         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
852                 cksum_type_t cksum_type =
853                         cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
854                                           repbody->oa.o_flags : 0);
855                 repbody->oa.o_flags = cksum_type_pack(cksum_type);
856                 repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
857                 repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
858                 CDEBUG(D_PAGE, "checksum at read origin: %x\n",
859                        repbody->oa.o_cksum);
860         } else {
861                 repbody->oa.o_valid = 0;
862         }
863         /* We're finishing using body->oa as an input variable */
864
865         /* Check if client was evicted while we were doing i/o before touching
866            network */
867         if (rc == 0) {
868                 if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
869                         rc = target_bulk_io(exp, desc, &lwi);
870                 no_reply = rc != 0;
871         }
872
873 out_commitrw:
874         /* Must commit after prep above in all cases */
875         rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
876                           &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
877                           oti, rc);
878
879         if (rc == 0)
880                 ost_drop_id(exp, &repbody->oa);
881
882 out_lock:
883         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
884 out_tls:
885         ost_tls_put(req);
886 out_bulk:
887         if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
888                 ptlrpc_free_bulk_nopin(desc);
889 out:
890         LASSERT(rc <= 0);
891         if (rc == 0) {
892                 req->rq_status = nob;
893                 ptlrpc_lprocfs_brw(req, nob);
894                 target_committed_to_req(req);
895                 ptlrpc_reply(req);
896         } else if (!no_reply) {
897                 /* Only reply if there was no comms problem with bulk */
898                 target_committed_to_req(req);
899                 req->rq_status = rc;
900                 ptlrpc_error(req);
901         } else {
902                 /* reply out callback would free */
903                 ptlrpc_req_drop_rs(req);
904                 LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
905                               "client will retry: rc %d\n",
906                               exp->exp_obd->obd_name,
907                               obd_uuid2str(&exp->exp_client_uuid),
908                               obd_export_nid2str(exp), rc);
909         }
910         /* send a bulk after reply to simulate a network delay or reordering
911          * by a router */
912         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
913                 cfs_waitq_t              waitq;
914                 struct l_wait_info       lwi1;
915
916                 CDEBUG(D_INFO, "reorder BULK\n");
917                 cfs_waitq_init(&waitq);
918
919                 lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
920                 l_wait_event(waitq, 0, &lwi1);
921                 rc = target_bulk_io(exp, desc, &lwi);
922                 ptlrpc_free_bulk_nopin(desc);
923         }
924
925         RETURN(rc);
926 }
927
928 static void ost_warn_on_cksum(struct ptlrpc_request *req,
929                               struct ptlrpc_bulk_desc *desc,
930                               struct niobuf_local *local_nb, int npages,
931                               obd_count client_cksum, obd_count server_cksum,
932                               int mmap)
933 {
934         struct obd_export *exp = req->rq_export;
935         struct ost_body *body;
936         char *router;
937         char *via;
938
939         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
940         LASSERT (body != NULL);
941
942         if (req->rq_peer.nid == desc->bd_sender) {
943                 via = router = "";
944         } else {
945                 via = " via ";
946                 router = libcfs_nid2str(desc->bd_sender);
947         }
948
949         if (mmap) {
950                 CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
951                              client_cksum, server_cksum);
952                 return;
953         }
954
955         LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
956                            DFID" object "DOSTID" extent ["LPU64"-"LPU64
957                            "]: client csum %x, server csum %x\n",
958                            exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
959                            via, router,
960                            body->oa.o_valid & OBD_MD_FLFID ?
961                            body->oa.o_parent_seq : (__u64)0,
962                            body->oa.o_valid & OBD_MD_FLFID ?
963                            body->oa.o_parent_oid : 0,
964                            body->oa.o_valid & OBD_MD_FLFID ?
965                            body->oa.o_parent_ver : 0,
966                            POSTID(&body->oa.o_oi),
967                            local_nb[0].lnb_file_offset,
968                            local_nb[npages-1].lnb_file_offset +
969                            local_nb[npages-1].len - 1,
970                            client_cksum, server_cksum);
971 }
972
973 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
974 {
975         struct ptlrpc_bulk_desc *desc = NULL;
976         struct obd_export       *exp = req->rq_export;
977         struct niobuf_remote    *remote_nb;
978         struct niobuf_local     *local_nb;
979         struct obd_ioobj        *ioo;
980         struct ost_body         *body, *repbody;
981         struct l_wait_info       lwi;
982         struct lustre_handle     lockh = {0};
983         struct lustre_capa      *capa = NULL;
984         __u32                   *rcs;
985         int objcount, niocount, npages;
986         int rc, i, j;
987         obd_count                client_cksum = 0, server_cksum = 0;
988         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
989         int                      no_reply = 0, mmap = 0;
990         __u32                    o_uid = 0, o_gid = 0;
991         struct ost_thread_local_cache *tls;
992         ENTRY;
993
994         req->rq_bulk_write = 1;
995
996         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
997                 GOTO(out, rc = -EIO);
998         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
999                 GOTO(out, rc = -EFAULT);
1000
1001         /* pause before transaction has been started */
1002         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
1003
1004         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
1005          * ost_rw_hpreq_check(). */
1006         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1007         if (body == NULL)
1008                 GOTO(out, rc = -EFAULT);
1009
1010         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1011                                         RCL_CLIENT) / sizeof(*ioo);
1012         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1013         if (ioo == NULL)
1014                 GOTO(out, rc = -EFAULT);
1015
1016         rc = ost_validate_obdo(exp, &body->oa, ioo);
1017         if (rc)
1018                 RETURN(rc);
1019
1020         for (niocount = i = 0; i < objcount; i++)
1021                 niocount += ioo[i].ioo_bufcnt;
1022
1023         /*
1024          * It'd be nice to have a capsule function to indicate how many elements
1025          * there were in a buffer for an RMF that's declared to be an array.
1026          * It's easy enough to compute the number of elements here though.
1027          */
1028         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1029         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
1030             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
1031                 GOTO(out, rc = -EFAULT);
1032
1033         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
1034             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
1035                 cfs_memory_pressure_set();
1036
1037         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
1038                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
1039                 if (capa == NULL) {
1040                         CERROR("Missing capability for OST BRW WRITE");
1041                         GOTO(out, rc = -EFAULT);
1042                 }
1043         }
1044
1045         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
1046                              niocount * sizeof(*rcs));
1047         rc = req_capsule_server_pack(&req->rq_pill);
1048         if (rc != 0)
1049                 GOTO(out, rc);
1050         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
1051         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
1052
1053         tls = ost_tls_get(req);
1054         if (tls == NULL)
1055                 GOTO(out_bulk, rc = -ENOMEM);
1056         local_nb = tls->local;
1057
1058         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
1059         if (rc != 0)
1060                 GOTO(out_tls, rc);
1061
1062         /*
1063          * If getting the lock took more time than
1064          * client was willing to wait, drop it. b=11330
1065          */
1066         if (cfs_time_current_sec() > req->rq_deadline ||
1067             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1068                 no_reply = 1;
1069                 CERROR("Dropping timed-out write from %s because locking "
1070                        "object "DOSTID" took %ld seconds (limit was %ld).\n",
1071                        libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
1072                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
1073                        req->rq_deadline - req->rq_arrival_time.tv_sec);
1074                 GOTO(out_lock, rc = -ETIMEDOUT);
1075         }
1076
1077         /* obd_preprw clobbers oa->valid, so save what we need */
1078         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1079                 client_cksum = body->oa.o_cksum;
1080                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1081                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1082         }
1083         if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
1084                 mmap = 1;
1085
1086         /* Because we already sync grant info with client when reconnect,
1087          * grant info will be cleared for resent req, then fed_grant and
1088          * total_grant will not be modified in following preprw_write */
1089         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1090                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1091                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1092         }
1093
1094         if (exp_connect_rmtclient(exp)) {
1095                 o_uid = body->oa.o_uid;
1096                 o_gid = body->oa.o_gid;
1097         }
1098
1099         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1100         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1101
1102         npages = OST_THREAD_POOL_SIZE;
1103         rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1104                         &repbody->oa, objcount, ioo, remote_nb, &npages,
1105                         local_nb, oti, capa);
1106         if (rc != 0)
1107                 GOTO(out_lock, rc);
1108
1109         desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
1110                                     BULK_GET_SINK, OST_BULK_PORTAL);
1111         if (desc == NULL)
1112                 GOTO(skip_transfer, rc = -ENOMEM);
1113
1114         /* NB Having prepped, we must commit... */
1115         for (i = 0; i < npages; i++)
1116                 ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
1117                                             local_nb[i].lnb_page_offset,
1118                                             local_nb[i].len);
1119
1120         rc = sptlrpc_svc_prep_bulk(req, desc);
1121         if (rc != 0)
1122                 GOTO(out_lock, rc);
1123
1124         rc = target_bulk_io(exp, desc, &lwi);
1125         no_reply = rc != 0;
1126
1127 skip_transfer:
1128         if (client_cksum != 0 && rc == 0) {
1129                 static int cksum_counter;
1130                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1131                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1132                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1133                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1134                 repbody->oa.o_cksum = server_cksum;
1135                 cksum_counter++;
1136                 if (unlikely(client_cksum != server_cksum)) {
1137                         ost_warn_on_cksum(req, desc, local_nb, npages,
1138                                           client_cksum, server_cksum, mmap);
1139                         cksum_counter = 0;
1140
1141                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1142                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1143                                cksum_counter, libcfs_id2str(req->rq_peer),
1144                                server_cksum);
1145                 }
1146         }
1147
1148         /* Must commit after prep above in all cases */
1149         rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1150                           &repbody->oa, objcount, ioo, remote_nb, npages,
1151                           local_nb, oti, rc);
1152         if (rc == -ENOTCONN)
1153                 /* quota acquire process has been given up because
1154                  * either the client has been evicted or the client
1155                  * has timed out the request already */
1156                 no_reply = 1;
1157
1158         if (exp_connect_rmtclient(exp)) {
1159                 repbody->oa.o_uid = o_uid;
1160                 repbody->oa.o_gid = o_gid;
1161         }
1162
1163         /*
1164          * Disable sending mtime back to the client. If the client locked the
1165          * whole object, then it has already updated the mtime on its side,
1166          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1167          */
1168         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1169
1170         if (rc == 0) {
1171                 int nob = 0;
1172
1173                 /* set per-requested niobuf return codes */
1174                 for (i = j = 0; i < niocount; i++) {
1175                         int len = remote_nb[i].len;
1176
1177                         nob += len;
1178                         rcs[i] = 0;
1179                         do {
1180                                 LASSERT(j < npages);
1181                                 if (local_nb[j].rc < 0)
1182                                         rcs[i] = local_nb[j].rc;
1183                                 len -= local_nb[j].len;
1184                                 j++;
1185                         } while (len > 0);
1186                         LASSERT(len == 0);
1187                 }
1188                 LASSERT(j == npages);
1189                 ptlrpc_lprocfs_brw(req, nob);
1190         }
1191
1192 out_lock:
1193         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1194 out_tls:
1195         ost_tls_put(req);
1196 out_bulk:
1197         if (desc)
1198                 ptlrpc_free_bulk_nopin(desc);
1199 out:
1200         if (rc == 0) {
1201                 oti_to_request(oti, req);
1202                 target_committed_to_req(req);
1203                 rc = ptlrpc_reply(req);
1204         } else if (!no_reply) {
1205                 /* Only reply if there was no comms problem with bulk */
1206                 target_committed_to_req(req);
1207                 req->rq_status = rc;
1208                 ptlrpc_error(req);
1209         } else {
1210                 /* reply out callback would free */
1211                 ptlrpc_req_drop_rs(req);
1212                 LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
1213                               "client will retry: rc %d\n",
1214                               exp->exp_obd->obd_name,
1215                               obd_uuid2str(&exp->exp_client_uuid),
1216                               obd_export_nid2str(exp), rc);
1217         }
1218         cfs_memory_pressure_clr();
1219         RETURN(rc);
1220 }
1221
1222 /**
1223  * Implementation of OST_SET_INFO.
1224  *
1225  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1226  * "key" and a value RPC buffers as arguments, with the value's contents
1227  * interpreted according to the key.
1228  *
1229  * Value types that need swabbing have swabbing done explicitly, either here or
1230  * in functions called from here.  This should be corrected: all swabbing should
1231  * be done in the capsule abstraction, as that will then allow us to move
1232  * swabbing exclusively to the client without having to modify server code
1233  * outside the capsule abstraction's implementation itself.  To correct this
1234  * will require minor changes to the capsule abstraction; see the comments for
1235  * req_capsule_extend() in layout.c.
1236  */
1237 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1238 {
1239         struct ost_body *body = NULL, *repbody;
1240         char *key, *val = NULL;
1241         int keylen, vallen, rc = 0;
1242         int is_grant_shrink = 0;
1243         ENTRY;
1244
1245         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1246         if (key == NULL) {
1247                 DEBUG_REQ(D_HA, req, "no set_info key");
1248                 RETURN(-EFAULT);
1249         }
1250         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1251                                       RCL_CLIENT);
1252
1253         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1254                                       RCL_CLIENT);
1255
1256         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1257                 /* In this case the value is actually an RMF_OST_BODY, so we
1258                  * transmutate the type of this PTLRPC */
1259                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1260
1261         rc = req_capsule_server_pack(&req->rq_pill);
1262         if (rc)
1263                 RETURN(rc);
1264
1265         if (vallen) {
1266                 if (is_grant_shrink) {
1267                         body = req_capsule_client_get(&req->rq_pill,
1268                                                       &RMF_OST_BODY);
1269                         if (!body)
1270                                 RETURN(-EFAULT);
1271
1272                         repbody = req_capsule_server_get(&req->rq_pill,
1273                                                          &RMF_OST_BODY);
1274                         memcpy(repbody, body, sizeof(*body));
1275                         val = (char*)repbody;
1276                 } else {
1277                         val = req_capsule_client_get(&req->rq_pill,
1278                                                      &RMF_SETINFO_VAL);
1279                 }
1280         }
1281
1282         if (KEY_IS(KEY_EVICT_BY_NID)) {
1283                 if (val && vallen)
1284                         obd_export_evict_by_nid(exp->exp_obd, val);
1285                 GOTO(out, rc = 0);
1286         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1287                 if (vallen < sizeof(__u32))
1288                         RETURN(-EFAULT);
1289                 __swab32s((__u32 *)val);
1290         }
1291
1292         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1293          * a struct ost_body * value */
1294         rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
1295                                 key, vallen, val, NULL);
1296 out:
1297         lustre_msg_set_status(req->rq_repmsg, 0);
1298         RETURN(rc);
1299 }
1300
1301 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1302 {
1303         void *key, *reply;
1304         int keylen, replylen, rc = 0;
1305         struct req_capsule *pill = &req->rq_pill;
1306         ENTRY;
1307
1308         /* this common part for get_info rpc */
1309         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1310         if (key == NULL) {
1311                 DEBUG_REQ(D_HA, req, "no get_info key");
1312                 RETURN(-EFAULT);
1313         }
1314         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1315
1316         if (KEY_IS(KEY_FIEMAP)) {
1317                 struct ll_fiemap_info_key *fm_key = key;
1318                 int rc;
1319
1320                 rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
1321                 if (rc)
1322                         RETURN(rc);
1323         }
1324
1325         rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1326                           &replylen, NULL, NULL);
1327         if (rc)
1328                 RETURN(rc);
1329
1330         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1331                              RCL_SERVER, replylen);
1332
1333         rc = req_capsule_server_pack(pill);
1334         if (rc)
1335                 RETURN(rc);
1336
1337         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1338         if (reply == NULL)
1339                 RETURN(-ENOMEM);
1340
1341         if (KEY_IS(KEY_LAST_FID)) {
1342                 void *val;
1343                 int vallen;
1344
1345                 req_capsule_extend(pill, &RQF_OST_GET_INFO_LAST_FID);
1346                 val = req_capsule_client_get(pill, &RMF_SETINFO_VAL);
1347                 vallen = req_capsule_get_size(pill, &RMF_SETINFO_VAL,
1348                                               RCL_CLIENT);
1349                 if (val != NULL && vallen > 0 && replylen >= vallen) {
1350                         memcpy(reply, val, vallen);
1351                 } else {
1352                         CERROR("%s: invalid req val %p vallen %d replylen %d\n",
1353                                exp->exp_obd->obd_name, val, vallen, replylen);
1354                         GOTO(out, rc = -EINVAL);
1355                 }
1356         }
1357
1358         /* call again to fill in the reply buffer */
1359         rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1360                           &replylen, reply, NULL);
1361 out:
1362         lustre_msg_set_status(req->rq_repmsg, 0);
1363         RETURN(rc);
1364 }
1365
1366 static int ost_handle_quotactl(struct ptlrpc_request *req)
1367 {
1368         struct obd_quotactl *oqctl, *repoqc;
1369         int rc;
1370         ENTRY;
1371
1372         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1373         if (oqctl == NULL)
1374                 GOTO(out, rc = -EPROTO);
1375
1376         rc = req_capsule_server_pack(&req->rq_pill);
1377         if (rc)
1378                 GOTO(out, rc);
1379
1380         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1381         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1382         *repoqc = *oqctl;
1383
1384 out:
1385         RETURN(rc);
1386 }
1387
1388 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1389 {
1390         struct obd_quotactl *oqctl;
1391         int rc;
1392         ENTRY;
1393
1394         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1395         if (oqctl == NULL)
1396                 RETURN(-EPROTO);
1397
1398         rc = req_capsule_server_pack(&req->rq_pill);
1399         if (rc)
1400                 RETURN(-ENOMEM);
1401
1402         /* deprecated, not used any more */
1403         req->rq_status = -EOPNOTSUPP;
1404         RETURN(-EOPNOTSUPP);
1405 }
1406
1407 static int ost_llog_handle_connect(struct obd_export *exp,
1408                                    struct ptlrpc_request *req)
1409 {
1410         struct llogd_conn_body *body;
1411         int rc;
1412         ENTRY;
1413
1414         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1415         rc = obd_llog_connect(exp, body);
1416         RETURN(rc);
1417 }
1418
1419 #define ost_init_sec_none(reply, exp)                                   \
1420 do {                                                                    \
1421         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1422                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1423                                       OBD_CONNECT_OSS_CAPA);            \
1424         spin_lock(&exp->exp_lock);                                      \
1425         *exp_connect_flags_ptr(exp) = reply->ocd_connect_flags;         \
1426         spin_unlock(&exp->exp_lock);                                    \
1427 } while (0)
1428
1429 static int ost_init_sec_level(struct ptlrpc_request *req)
1430 {
1431         struct obd_export *exp = req->rq_export;
1432         struct req_capsule *pill = &req->rq_pill;
1433         struct obd_device *obd = exp->exp_obd;
1434         struct filter_obd *filter = &obd->u.filter;
1435         char *client = libcfs_nid2str(req->rq_peer.nid);
1436         struct obd_connect_data *data, *reply;
1437         int rc = 0, remote;
1438         ENTRY;
1439
1440         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1441         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1442         if (data == NULL || reply == NULL)
1443                 RETURN(-EFAULT);
1444
1445         /* connection from MDT is always trusted */
1446         if (req->rq_auth_usr_mdt) {
1447                 ost_init_sec_none(reply, exp);
1448                 RETURN(0);
1449         }
1450
1451         /* no GSS support case */
1452         if (!req->rq_auth_gss) {
1453                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1454                         CWARN("client %s -> target %s does not user GSS, "
1455                               "can not run under security level %d.\n",
1456                               client, obd->obd_name, filter->fo_sec_level);
1457                         RETURN(-EACCES);
1458                 } else {
1459                         ost_init_sec_none(reply, exp);
1460                         RETURN(0);
1461                 }
1462         }
1463
1464         /* old version case */
1465         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1466                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1467                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1468                         CWARN("client %s -> target %s uses old version, "
1469                               "can not run under security level %d.\n",
1470                               client, obd->obd_name, filter->fo_sec_level);
1471                         RETURN(-EACCES);
1472                 } else {
1473                         CWARN("client %s -> target %s uses old version, "
1474                               "run under security level %d.\n",
1475                               client, obd->obd_name, filter->fo_sec_level);
1476                         ost_init_sec_none(reply, exp);
1477                         RETURN(0);
1478                 }
1479         }
1480
1481         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1482         if (remote) {
1483                 if (!req->rq_auth_remote)
1484                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1485                                "asked to be remote.\n", client, obd->obd_name);
1486         } else if (req->rq_auth_remote) {
1487                 remote = 1;
1488                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1489                        "as remote by default.\n", client, obd->obd_name);
1490         }
1491
1492         if (remote) {
1493                 if (!filter->fo_fl_oss_capa) {
1494                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1495                                " but OSS capabilities are not enabled: %d.\n",
1496                                client, obd->obd_name, filter->fo_fl_oss_capa);
1497                         RETURN(-EACCES);
1498                 }
1499         }
1500
1501         switch (filter->fo_sec_level) {
1502         case LUSTRE_SEC_NONE:
1503                 if (!remote) {
1504                         ost_init_sec_none(reply, exp);
1505                         break;
1506                 } else {
1507                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1508                                "can not run under security level %d.\n",
1509                                client, obd->obd_name, filter->fo_sec_level);
1510                         RETURN(-EACCES);
1511                 }
1512         case LUSTRE_SEC_REMOTE:
1513                 if (!remote)
1514                         ost_init_sec_none(reply, exp);
1515                 break;
1516         case LUSTRE_SEC_ALL:
1517                 if (!remote) {
1518                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1519                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1520                         if (!filter->fo_fl_oss_capa)
1521                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1522
1523                         spin_lock(&exp->exp_lock);
1524                         *exp_connect_flags_ptr(exp) = reply->ocd_connect_flags;
1525                         spin_unlock(&exp->exp_lock);
1526                 }
1527                 break;
1528         default:
1529                 RETURN(-EINVAL);
1530         }
1531
1532         RETURN(rc);
1533 }
1534
1535 /*
1536  * FIXME
1537  * this should be done in filter_connect()/filter_reconnect(), but
1538  * we can't obtain information like NID, which stored in incoming
1539  * request, thus can't decide what flavor to use. so we do it here.
1540  *
1541  * This hack should be removed after the OST stack be rewritten, just
1542  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1543  */
1544 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1545 {
1546         struct obd_export     *exp = req->rq_export;
1547         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1548         struct sptlrpc_flavor  flvr;
1549         int                    rc = 0;
1550
1551         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1552                             LUSTRE_ECHO_NAME) == 0)) {
1553                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1554                 return 0;
1555         }
1556
1557         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1558                 read_lock(&filter->fo_sptlrpc_lock);
1559                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1560                                              req->rq_sp_from,
1561                                              req->rq_peer.nid,
1562                                              &flvr);
1563                 read_unlock(&filter->fo_sptlrpc_lock);
1564
1565                 spin_lock(&exp->exp_lock);
1566
1567                 exp->exp_sp_peer = req->rq_sp_from;
1568                 exp->exp_flvr = flvr;
1569
1570                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1571                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1572                         CERROR("unauthorized rpc flavor %x from %s, "
1573                                "expect %x\n", req->rq_flvr.sf_rpc,
1574                                libcfs_nid2str(req->rq_peer.nid),
1575                                exp->exp_flvr.sf_rpc);
1576                         rc = -EACCES;
1577                 }
1578
1579                 spin_unlock(&exp->exp_lock);
1580         } else {
1581                 if (exp->exp_sp_peer != req->rq_sp_from) {
1582                         CERROR("RPC source %s doesn't match %s\n",
1583                                sptlrpc_part2name(req->rq_sp_from),
1584                                sptlrpc_part2name(exp->exp_sp_peer));
1585                         rc = -EACCES;
1586                 } else {
1587                         rc = sptlrpc_target_export_check(exp, req);
1588                 }
1589         }
1590
1591         return rc;
1592 }
1593
1594 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1595  * (if requested) */
1596 int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
1597                      void *data, int flag)
1598 {
1599         struct lu_env   env;
1600         __u32           sync_lock_cancel = 0;
1601         __u32           len = sizeof(sync_lock_cancel);
1602         int             rc = 0;
1603
1604         ENTRY;
1605
1606         rc = lu_env_init(&env, LCT_DT_THREAD);
1607         if (unlikely(rc != 0))
1608                 RETURN(rc);
1609
1610         rc = obd_get_info(&env, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1611                           KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1612         if (rc == 0 && flag == LDLM_CB_CANCELING &&
1613             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1614             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1615              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1616               lock->l_flags & LDLM_FL_CBPENDING))) {
1617                 struct obd_info *oinfo;
1618                 struct obdo     *oa;
1619                 int              rc;
1620
1621                 OBD_ALLOC_PTR(oinfo);
1622                 if (!oinfo)
1623                         GOTO(out_env, rc = -ENOMEM);
1624                 OBDO_ALLOC(oa);
1625                 if (!oa) {
1626                         OBD_FREE_PTR(oinfo);
1627                         GOTO(out_env, rc = -ENOMEM);
1628                 }
1629
1630                 ostid_res_name_to_id(&oa->o_oi, &lock->l_resource->lr_name);
1631                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
1632                 oinfo->oi_oa = oa;
1633                 oinfo->oi_capa = BYPASS_CAPA;
1634
1635                 rc = obd_sync(&env, lock->l_export, oinfo,
1636                               lock->l_policy_data.l_extent.start,
1637                               lock->l_policy_data.l_extent.end, NULL);
1638                 if (rc)
1639                         CERROR("Error %d syncing data on lock cancel\n", rc);
1640
1641                 OBDO_FREE(oa);
1642                 OBD_FREE_PTR(oinfo);
1643         }
1644
1645         rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1646 out_env:
1647         lu_env_fini(&env);
1648         RETURN(rc);
1649 }
1650
1651 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1652                                        struct obd_device *obd, int *process)
1653 {
1654         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1655         case OST_CONNECT: /* This will never get here, but for completeness. */
1656         case OST_DISCONNECT:
1657                *process = 1;
1658                RETURN(0);
1659
1660         case OBD_PING:
1661         case OST_CREATE:
1662         case OST_DESTROY:
1663         case OST_PUNCH:
1664         case OST_SETATTR:
1665         case OST_SYNC:
1666         case OST_WRITE:
1667         case OBD_LOG_CANCEL:
1668         case LDLM_ENQUEUE:
1669                 *process = target_queue_recovery_request(req, obd);
1670                 RETURN(0);
1671
1672         default:
1673                 DEBUG_REQ(D_WARNING, req, "not permitted during recovery");
1674                 *process = -EAGAIN;
1675                 RETURN(0);
1676         }
1677 }
1678
1679 int ost_msg_check_version(struct lustre_msg *msg)
1680 {
1681         int rc;
1682
1683         switch(lustre_msg_get_opc(msg)) {
1684         case OST_CONNECT:
1685         case OST_DISCONNECT:
1686         case OBD_PING:
1687         case SEC_CTX_INIT:
1688         case SEC_CTX_INIT_CONT:
1689         case SEC_CTX_FINI:
1690                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1691                 if (rc)
1692                         CERROR("bad opc %u version %08x, expecting %08x\n",
1693                                lustre_msg_get_opc(msg),
1694                                lustre_msg_get_version(msg),
1695                                LUSTRE_OBD_VERSION);
1696                 break;
1697         case SEQ_QUERY:
1698                 /* Note: client always use MDS_VERSION for FID request */
1699                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1700                 if (rc)
1701                         CERROR("bad opc %u version %08x, expecting %08x\n",
1702                                lustre_msg_get_opc(msg),
1703                                lustre_msg_get_version(msg),
1704                                LUSTRE_MDS_VERSION);
1705                 break;
1706         case OST_CREATE:
1707         case OST_DESTROY:
1708         case OST_GETATTR:
1709         case OST_SETATTR:
1710         case OST_WRITE:
1711         case OST_READ:
1712         case OST_PUNCH:
1713         case OST_STATFS:
1714         case OST_SYNC:
1715         case OST_SET_INFO:
1716         case OST_GET_INFO:
1717         case OST_QUOTACHECK:
1718         case OST_QUOTACTL:
1719                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1720                 if (rc)
1721                         CERROR("bad opc %u version %08x, expecting %08x\n",
1722                                lustre_msg_get_opc(msg),
1723                                lustre_msg_get_version(msg),
1724                                LUSTRE_OST_VERSION);
1725                 break;
1726         case LDLM_ENQUEUE:
1727         case LDLM_CONVERT:
1728         case LDLM_CANCEL:
1729         case LDLM_BL_CALLBACK:
1730         case LDLM_CP_CALLBACK:
1731                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1732                 if (rc)
1733                         CERROR("bad opc %u version %08x, expecting %08x\n",
1734                                lustre_msg_get_opc(msg),
1735                                lustre_msg_get_version(msg),
1736                                LUSTRE_DLM_VERSION);
1737                 break;
1738         case LLOG_ORIGIN_CONNECT:
1739         case OBD_LOG_CANCEL:
1740                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1741                 if (rc)
1742                         CERROR("bad opc %u version %08x, expecting %08x\n",
1743                                lustre_msg_get_opc(msg),
1744                                lustre_msg_get_version(msg),
1745                                LUSTRE_LOG_VERSION);
1746                 break;
1747         case OST_QUOTA_ADJUST_QUNIT:
1748                 rc = -ENOTSUPP;
1749                 CERROR("Quota adjust is deprecated as of 2.4.0\n");
1750                 break;
1751         default:
1752                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1753                 rc = -ENOTSUPP;
1754         }
1755         return rc;
1756 }
1757
1758 struct ost_prolong_data {
1759         struct ptlrpc_request *opd_req;
1760         struct obd_export     *opd_exp;
1761         struct obdo           *opd_oa;
1762         struct ldlm_res_id     opd_resid;
1763         struct ldlm_extent     opd_extent;
1764         ldlm_mode_t            opd_mode;
1765         unsigned int           opd_locks;
1766         int                    opd_timeout;
1767 };
1768
1769 /* prolong locks for the current service time of the corresponding
1770  * portal (= OST_IO_PORTAL)
1771  */
1772 static inline int prolong_timeout(struct ptlrpc_request *req)
1773 {
1774         struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
1775
1776         if (AT_OFF)
1777                 return obd_timeout / 2;
1778
1779         return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
1780                    ldlm_timeout);
1781 }
1782
1783 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
1784                                  struct ldlm_lock *lock)
1785 {
1786         LASSERT(lock->l_export == opd->opd_exp);
1787
1788         if (lock->l_destroyed) /* lock already cancelled */
1789                 return;
1790
1791         /* XXX: never try to grab resource lock here because we're inside
1792          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
1793          * res lock and then exp_bl_list_lock. */
1794
1795         if (!(lock->l_flags & LDLM_FL_AST_SENT))
1796                 /* ignore locks not being cancelled */
1797                 return;
1798
1799         LDLM_DEBUG(lock,
1800                    "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
1801                    opd->opd_req->rq_xid, opd->opd_extent.start,
1802                    opd->opd_extent.end, opd->opd_timeout);
1803
1804         /* OK. this is a possible lock the user holds doing I/O
1805          * let's refresh eviction timer for it */
1806         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
1807         ++opd->opd_locks;
1808 }
1809
1810 static void ost_prolong_locks(struct ost_prolong_data *data)
1811 {
1812         struct obd_export *exp = data->opd_exp;
1813         struct obdo       *oa  = data->opd_oa;
1814         struct ldlm_lock  *lock;
1815         ENTRY;
1816
1817         if (oa->o_valid & OBD_MD_FLHANDLE) {
1818                 /* mostly a request should be covered by only one lock, try
1819                  * fast path. */
1820                 lock = ldlm_handle2lock(&oa->o_handle);
1821                 if (lock != NULL) {
1822                         /* Fast path to check if the lock covers the whole IO
1823                          * region exclusively. */
1824                         if (lock->l_granted_mode == LCK_PW &&
1825                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
1826                                                 &data->opd_extent)) {
1827                                 /* bingo */
1828                                 ost_prolong_lock_one(data, lock);
1829                                 LDLM_LOCK_PUT(lock);
1830                                 RETURN_EXIT;
1831                         }
1832                         LDLM_LOCK_PUT(lock);
1833                 }
1834         }
1835
1836
1837         spin_lock_bh(&exp->exp_bl_list_lock);
1838         cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
1839                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1840                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
1841
1842                 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
1843                         continue;
1844
1845                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
1846                                          &data->opd_extent))
1847                         continue;
1848
1849                 ost_prolong_lock_one(data, lock);
1850         }
1851         spin_unlock_bh(&exp->exp_bl_list_lock);
1852
1853         EXIT;
1854 }
1855
1856 /**
1857  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1858  * not.
1859  */
1860 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1861                                    struct ldlm_lock *lock)
1862 {
1863         struct niobuf_remote *nb;
1864         struct obd_ioobj *ioo;
1865         int mode, opc;
1866         struct ldlm_extent ext;
1867         ENTRY;
1868
1869         opc = lustre_msg_get_opc(req->rq_reqmsg);
1870         LASSERT(opc == OST_READ || opc == OST_WRITE);
1871
1872         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1873         LASSERT(ioo != NULL);
1874
1875         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1876         LASSERT(nb != NULL);
1877
1878         ext.start = nb->offset;
1879         nb += ioo->ioo_bufcnt - 1;
1880         ext.end = nb->offset + nb->len - 1;
1881
1882         LASSERT(lock->l_resource != NULL);
1883         if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
1884                 RETURN(0);
1885
1886         mode = LCK_PW;
1887         if (opc == OST_READ)
1888                 mode |= LCK_PR;
1889         if (!(lock->l_granted_mode & mode))
1890                 RETURN(0);
1891
1892         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
1893 }
1894
1895 /**
1896  * High-priority queue request check for whether the given PTLRPC request (\a
1897  * req) is blocking an LDLM lock cancel.
1898  *
1899  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1900  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1901  *
1902  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1903  * function looks only at OST_READs and OST_WRITEs.
1904  */
1905 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1906 {
1907         struct obd_device *obd = req->rq_export->exp_obd;
1908         struct ost_body *body;
1909         struct obd_ioobj *ioo;
1910         struct niobuf_remote *nb;
1911         struct ost_prolong_data opd = { 0 };
1912         int mode, opc;
1913         ENTRY;
1914
1915         /*
1916          * Use LASSERT to do sanity check because malformed RPCs should have
1917          * been filtered out in ost_hpreq_handler().
1918          */
1919         opc = lustre_msg_get_opc(req->rq_reqmsg);
1920         LASSERT(opc == OST_READ || opc == OST_WRITE);
1921
1922         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1923         LASSERT(body != NULL);
1924
1925         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1926         LASSERT(ioo != NULL);
1927
1928         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1929         LASSERT(nb != NULL);
1930         LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
1931
1932         ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
1933
1934         opd.opd_req = req;
1935         mode = LCK_PW;
1936         if (opc == OST_READ)
1937                 mode |= LCK_PR;
1938         opd.opd_mode = mode;
1939         opd.opd_exp = req->rq_export;
1940         opd.opd_oa  = &body->oa;
1941         opd.opd_extent.start = nb->offset;
1942         nb += ioo->ioo_bufcnt - 1;
1943         opd.opd_extent.end = nb->offset + nb->len - 1;
1944         opd.opd_timeout = prolong_timeout(req);
1945
1946         DEBUG_REQ(D_RPCTRACE, req,
1947                "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1948                obd->obd_name, cfs_current()->comm,
1949                opd.opd_resid.name[0], opd.opd_resid.name[1],
1950                opd.opd_extent.start, opd.opd_extent.end);
1951
1952         ost_prolong_locks(&opd);
1953
1954         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1955                obd->obd_name, opd.opd_locks, req);
1956
1957         RETURN(opd.opd_locks > 0);
1958 }
1959
1960 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
1961 {
1962         (void)ost_rw_hpreq_check(req);
1963 }
1964
1965 /**
1966  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1967  */
1968 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1969                                       struct ldlm_lock *lock)
1970 {
1971         struct ost_body *body;
1972         ENTRY;
1973
1974         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1975         LASSERT(body != NULL);
1976
1977         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1978             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1979                 RETURN(1);
1980
1981         RETURN(0);
1982 }
1983
1984 /**
1985  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1986  */
1987 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1988 {
1989         struct obd_device *obd = req->rq_export->exp_obd;
1990         struct ost_body *body;
1991         struct obdo *oa;
1992         struct ost_prolong_data opd = { 0 };
1993         __u64 start, end;
1994         ENTRY;
1995
1996         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1997         LASSERT(body != NULL);
1998
1999         oa = &body->oa;
2000         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
2001                 !(oa->o_flags & OBD_FL_SRVLOCK));
2002
2003         start = oa->o_size;
2004         end = start + oa->o_blocks;
2005
2006         opd.opd_req = req;
2007         opd.opd_mode = LCK_PW;
2008         opd.opd_exp = req->rq_export;
2009         opd.opd_oa  = oa;
2010         opd.opd_extent.start = start;
2011         opd.opd_extent.end   = end;
2012         if (oa->o_blocks == OBD_OBJECT_EOF)
2013                 opd.opd_extent.end = OBD_OBJECT_EOF;
2014         opd.opd_timeout = prolong_timeout(req);
2015
2016         ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
2017
2018         CDEBUG(D_DLMTRACE,
2019                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
2020                obd->obd_name,
2021                opd.opd_resid.name[0], opd.opd_resid.name[1],
2022                opd.opd_extent.start, opd.opd_extent.end);
2023
2024         ost_prolong_locks(&opd);
2025
2026         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
2027                obd->obd_name, opd.opd_locks, req);
2028
2029         RETURN(opd.opd_locks > 0);
2030 }
2031
2032 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
2033 {
2034         (void)ost_punch_hpreq_check(req);
2035 }
2036
2037 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
2038         .hpreq_lock_match = ost_rw_hpreq_lock_match,
2039         .hpreq_check      = ost_rw_hpreq_check,
2040         .hpreq_fini       = ost_rw_hpreq_fini
2041 };
2042
2043 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
2044         .hpreq_lock_match = ost_punch_hpreq_lock_match,
2045         .hpreq_check      = ost_punch_hpreq_check,
2046         .hpreq_fini       = ost_punch_hpreq_fini
2047 };
2048
2049 /** Assign high priority operations to the request if needed. */
2050 static int ost_io_hpreq_handler(struct ptlrpc_request *req)
2051 {
2052         ENTRY;
2053         if (req->rq_export) {
2054                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
2055                 struct ost_body *body;
2056
2057                 if (opc == OST_READ || opc == OST_WRITE) {
2058                         struct niobuf_remote *nb;
2059                         struct obd_ioobj *ioo;
2060                         int objcount, niocount;
2061                         int rc;
2062                         int i;
2063
2064                         /* RPCs on the H-P queue can be inspected before
2065                          * ost_handler() initializes their pills, so we
2066                          * initialize that here.  Capsule initialization is
2067                          * idempotent, as is setting the pill's format (provided
2068                          * it doesn't change).
2069                          */
2070                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2071                         if (opc == OST_READ)
2072                                 req_capsule_set(&req->rq_pill,
2073                                                 &RQF_OST_BRW_READ);
2074                         else
2075                                 req_capsule_set(&req->rq_pill,
2076                                                 &RQF_OST_BRW_WRITE);
2077
2078                         body = req_capsule_client_get(&req->rq_pill,
2079                                                       &RMF_OST_BODY);
2080                         if (body == NULL) {
2081                                 CERROR("Missing/short ost_body\n");
2082                                 RETURN(-EFAULT);
2083                         }
2084
2085                         objcount = req_capsule_get_size(&req->rq_pill,
2086                                                         &RMF_OBD_IOOBJ,
2087                                                         RCL_CLIENT) /
2088                                                         sizeof(*ioo);
2089                         if (objcount == 0) {
2090                                 CERROR("Missing/short ioobj\n");
2091                                 RETURN(-EFAULT);
2092                         }
2093                         if (objcount > 1) {
2094                                 CERROR("too many ioobjs (%d)\n", objcount);
2095                                 RETURN(-EFAULT);
2096                         }
2097
2098                         ioo = req_capsule_client_get(&req->rq_pill,
2099                                                      &RMF_OBD_IOOBJ);
2100                         if (ioo == NULL) {
2101                                 CERROR("Missing/short ioobj\n");
2102                                 RETURN(-EFAULT);
2103                         }
2104
2105                         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
2106                         if (rc) {
2107                                 CERROR("invalid object ids\n");
2108                                 RETURN(rc);
2109                         }
2110
2111                         for (niocount = i = 0; i < objcount; i++) {
2112                                 if (ioo[i].ioo_bufcnt == 0) {
2113                                         CERROR("ioo[%d] has zero bufcnt\n", i);
2114                                         RETURN(-EFAULT);
2115                                 }
2116                                 niocount += ioo[i].ioo_bufcnt;
2117                         }
2118                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
2119                                 DEBUG_REQ(D_RPCTRACE, req,
2120                                           "bulk has too many pages (%d)",
2121                                           niocount);
2122                                 RETURN(-EFAULT);
2123                         }
2124
2125                         nb = req_capsule_client_get(&req->rq_pill,
2126                                                     &RMF_NIOBUF_REMOTE);
2127                         if (nb == NULL) {
2128                                 CERROR("Missing/short niobuf\n");
2129                                 RETURN(-EFAULT);
2130                         }
2131
2132                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
2133                                 req->rq_ops = &ost_hpreq_rw;
2134                 } else if (opc == OST_PUNCH) {
2135                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2136                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2137
2138                         body = req_capsule_client_get(&req->rq_pill,
2139                                                       &RMF_OST_BODY);
2140                         if (body == NULL) {
2141                                 CERROR("Missing/short ost_body\n");
2142                                 RETURN(-EFAULT);
2143                         }
2144
2145                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
2146                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
2147                                 req->rq_ops = &ost_hpreq_punch;
2148                 }
2149         }
2150         RETURN(0);
2151 }
2152
2153 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2154 int ost_handle(struct ptlrpc_request *req)
2155 {
2156         struct obd_trans_info trans_info = { 0, };
2157         struct obd_trans_info *oti = &trans_info;
2158         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
2159         struct obd_device *obd = NULL;
2160         ENTRY;
2161
2162         /* OST module is kept between remounts, but the last reference
2163          * to specific module (say, osd or ofd) kills all related keys
2164          * from the environment. so we have to refill it until the root
2165          * cause is fixed properly */
2166         lu_env_refill(req->rq_svc_thread->t_env);
2167
2168         LASSERT(current->journal_info == NULL);
2169
2170         /* primordial rpcs don't affect server recovery */
2171         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2172         case SEC_CTX_INIT:
2173         case SEC_CTX_INIT_CONT:
2174         case SEC_CTX_FINI:
2175                 GOTO(out, rc = 0);
2176         }
2177
2178         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2179
2180         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
2181                 if (!class_connected_export(req->rq_export)) {
2182                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2183                                lustre_msg_get_opc(req->rq_reqmsg),
2184                                libcfs_id2str(req->rq_peer));
2185                         req->rq_status = -ENOTCONN;
2186                         GOTO(out, rc = -ENOTCONN);
2187                 }
2188
2189                 obd = req->rq_export->exp_obd;
2190
2191                 /* Check for aborted recovery. */
2192                 if (obd->obd_recovering) {
2193                         rc = ost_filter_recovery_request(req, obd,
2194                                                          &should_process);
2195                         if (rc || !should_process)
2196                                 RETURN(rc);
2197                         else if (should_process < 0) {
2198                                 req->rq_status = should_process;
2199                                 rc = ptlrpc_error(req);
2200                                 RETURN(rc);
2201                         }
2202                 }
2203         }
2204
2205         oti_init(oti, req);
2206
2207         rc = ost_msg_check_version(req->rq_reqmsg);
2208         if (rc)
2209                 RETURN(rc);
2210
2211         if (req && req->rq_reqmsg && req->rq_export &&
2212             (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS))
2213                 oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
2214
2215         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2216         case OST_CONNECT: {
2217                 CDEBUG(D_INODE, "connect\n");
2218                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2219                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2220                         RETURN(0);
2221                 rc = target_handle_connect(req);
2222                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2223                         RETURN(0);
2224                 if (!rc) {
2225                         rc = ost_init_sec_level(req);
2226                         if (!rc)
2227                                 rc = ost_connect_check_sptlrpc(req);
2228                 }
2229                 break;
2230         }
2231         case OST_DISCONNECT:
2232                 CDEBUG(D_INODE, "disconnect\n");
2233                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2234                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2235                         RETURN(0);
2236                 rc = target_handle_disconnect(req);
2237                 break;
2238         case OST_CREATE:
2239                 CDEBUG(D_INODE, "create\n");
2240                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2241                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2242                         RETURN(0);
2243                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2244                         GOTO(out, rc = -EROFS);
2245                 rc = ost_create(req->rq_export, req, oti);
2246                 break;
2247         case OST_DESTROY:
2248                 CDEBUG(D_INODE, "destroy\n");
2249                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2250                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2251                         RETURN(0);
2252                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2253                         GOTO(out, rc = -EROFS);
2254                 rc = ost_destroy(req->rq_export, req, oti);
2255                 break;
2256         case OST_GETATTR:
2257                 CDEBUG(D_INODE, "getattr\n");
2258                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2259                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2260                         RETURN(0);
2261                 rc = ost_getattr(req->rq_export, req);
2262                 break;
2263         case OST_SETATTR:
2264                 CDEBUG(D_INODE, "setattr\n");
2265                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2266                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2267                         RETURN(0);
2268                 rc = ost_setattr(req->rq_export, req, oti);
2269                 break;
2270         case OST_WRITE:
2271                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
2272                 CDEBUG(D_INODE, "write\n");
2273                 /* req->rq_request_portal would be nice, if it was set */
2274                 if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
2275                         CERROR("%s: deny write request from %s to portal %u\n",
2276                                req->rq_export->exp_obd->obd_name,
2277                                obd_export_nid2str(req->rq_export),
2278                                ptlrpc_req2svc(req)->srv_req_portal);
2279                         GOTO(out, rc = -EPROTO);
2280                 }
2281                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2282                         RETURN(0);
2283                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2284                         GOTO(out, rc = -ENOSPC);
2285                 if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
2286                         GOTO(out, rc = -EROFS);
2287                 rc = ost_brw_write(req, oti);
2288                 LASSERT(current->journal_info == NULL);
2289                 /* ost_brw_write sends its own replies */
2290                 RETURN(rc);
2291         case OST_READ:
2292                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
2293                 CDEBUG(D_INODE, "read\n");
2294                 /* req->rq_request_portal would be nice, if it was set */
2295                 if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
2296                         CERROR("%s: deny read request from %s to portal %u\n",
2297                                req->rq_export->exp_obd->obd_name,
2298                                obd_export_nid2str(req->rq_export),
2299                                ptlrpc_req2svc(req)->srv_req_portal);
2300                         GOTO(out, rc = -EPROTO);
2301                 }
2302                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2303                         RETURN(0);
2304                 rc = ost_brw_read(req, oti);
2305                 LASSERT(current->journal_info == NULL);
2306                 /* ost_brw_read sends its own replies */
2307                 RETURN(rc);
2308         case OST_PUNCH:
2309                 CDEBUG(D_INODE, "punch\n");
2310                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2311                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2312                         RETURN(0);
2313                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2314                         GOTO(out, rc = -EROFS);
2315                 rc = ost_punch(req->rq_export, req, oti);
2316                 break;
2317         case OST_STATFS:
2318                 CDEBUG(D_INODE, "statfs\n");
2319                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2320                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2321                         RETURN(0);
2322                 rc = ost_statfs(req);
2323                 break;
2324         case OST_SYNC:
2325                 CDEBUG(D_INODE, "sync\n");
2326                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2327                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2328                         RETURN(0);
2329                 rc = ost_sync(req->rq_export, req, oti);
2330                 break;
2331         case OST_SET_INFO:
2332                 DEBUG_REQ(D_INODE, req, "set_info");
2333                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2334                 rc = ost_set_info(req->rq_export, req);
2335                 break;
2336         case OST_GET_INFO:
2337                 DEBUG_REQ(D_INODE, req, "get_info");
2338                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2339                 rc = ost_get_info(req->rq_export, req);
2340                 break;
2341         case SEQ_QUERY:
2342                 CDEBUG(D_INODE, "seq\n");
2343                 rc = seq_handle(req);
2344                 break;
2345         case OST_QUOTACHECK:
2346                 CDEBUG(D_INODE, "quotacheck\n");
2347                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2348                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2349                         RETURN(0);
2350                 rc = ost_handle_quotacheck(req);
2351                 break;
2352         case OST_QUOTACTL:
2353                 CDEBUG(D_INODE, "quotactl\n");
2354                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2355                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2356                         RETURN(0);
2357                 rc = ost_handle_quotactl(req);
2358                 break;
2359         case OBD_PING:
2360                 DEBUG_REQ(D_INODE, req, "ping");
2361                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2362                 rc = target_handle_ping(req);
2363                 break;
2364         /* FIXME - just reply status */
2365         case LLOG_ORIGIN_CONNECT:
2366                 DEBUG_REQ(D_INODE, req, "log connect");
2367                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2368                 rc = ost_llog_handle_connect(req->rq_export, req);
2369                 req->rq_status = rc;
2370                 rc = req_capsule_server_pack(&req->rq_pill);
2371                 if (rc)
2372                         RETURN(rc);
2373                 RETURN(ptlrpc_reply(req));
2374         case OBD_LOG_CANCEL:
2375                 CDEBUG(D_INODE, "log cancel\n");
2376                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2377                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2378                         RETURN(0);
2379                 rc = llog_origin_handle_cancel(req);
2380                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2381                         RETURN(0);
2382                 req->rq_status = rc;
2383                 rc = req_capsule_server_pack(&req->rq_pill);
2384                 if (rc)
2385                         RETURN(rc);
2386                 RETURN(ptlrpc_reply(req));
2387         case LDLM_ENQUEUE:
2388                 CDEBUG(D_INODE, "enqueue\n");
2389                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2390                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_NET))
2391                         RETURN(0);
2392                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2393                                          ost_blocking_ast,
2394                                          ldlm_server_glimpse_ast);
2395                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2396                 break;
2397         case LDLM_CONVERT:
2398                 CDEBUG(D_INODE, "convert\n");
2399                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2400                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT_NET))
2401                         RETURN(0);
2402                 rc = ldlm_handle_convert(req);
2403                 break;
2404         case LDLM_CANCEL:
2405                 CDEBUG(D_INODE, "cancel\n");
2406                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2407                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET))
2408                         RETURN(0);
2409                 rc = ldlm_handle_cancel(req);
2410                 break;
2411         case LDLM_BL_CALLBACK:
2412         case LDLM_CP_CALLBACK:
2413                 CDEBUG(D_INODE, "callback\n");
2414                 CERROR("callbacks should not happen on OST\n");
2415                 /* fall through */
2416         default:
2417                 CERROR("Unexpected opcode %d\n",
2418                        lustre_msg_get_opc(req->rq_reqmsg));
2419                 req->rq_status = -ENOTSUPP;
2420                 rc = ptlrpc_error(req);
2421                 RETURN(rc);
2422         }
2423
2424         LASSERT(current->journal_info == NULL);
2425
2426         EXIT;
2427         /* If we're DISCONNECTing, the export_data is already freed */
2428         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2429                 target_committed_to_req(req);
2430
2431 out:
2432         if (!rc)
2433                 oti_to_request(oti, req);
2434
2435         target_send_reply(req, rc, fail);
2436         return 0;
2437 }
2438 EXPORT_SYMBOL(ost_handle);
2439
2440 /*
2441  * free per-thread pool created by ost_io_thread_init().
2442  */
2443 static void ost_io_thread_done(struct ptlrpc_thread *thread)
2444 {
2445         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2446                                              * Storage */
2447
2448         ENTRY;
2449
2450         LASSERT(thread != NULL);
2451
2452         /*
2453          * be prepared to handle partially-initialized pools (because this is
2454          * called from ost_io_thread_init() for cleanup.
2455          */
2456         tls = thread->t_data;
2457         if (tls != NULL) {
2458                 OBD_FREE_PTR(tls);
2459                 thread->t_data = NULL;
2460         }
2461         EXIT;
2462 }
2463
2464 /*
2465  * initialize per-thread page pool (bug 5137).
2466  */
2467 static int ost_io_thread_init(struct ptlrpc_thread *thread)
2468 {
2469         struct ost_thread_local_cache *tls;
2470
2471         ENTRY;
2472
2473         LASSERT(thread != NULL);
2474         LASSERT(thread->t_data == NULL);
2475
2476         OBD_ALLOC_PTR(tls);
2477         if (tls == NULL)
2478                 RETURN(-ENOMEM);
2479         thread->t_data = tls;
2480         RETURN(0);
2481 }
2482
2483 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2484
2485 static struct cfs_cpt_table     *ost_io_cptable;
2486
2487 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2488 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2489 {
2490         static struct ptlrpc_service_conf       svc_conf;
2491         struct ost_obd *ost = &obd->u.ost;
2492         struct lprocfs_static_vars lvars;
2493         nodemask_t              *mask;
2494         int rc;
2495         ENTRY;
2496
2497         rc = cfs_cleanup_group_info();
2498         if (rc)
2499                 RETURN(rc);
2500
2501         lprocfs_ost_init_vars(&lvars);
2502         lprocfs_obd_setup(obd, lvars.obd_vars);
2503
2504         mutex_init(&ost->ost_health_mutex);
2505
2506         svc_conf = (typeof(svc_conf)) {
2507                 .psc_name               = LUSTRE_OSS_NAME,
2508                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2509                 .psc_buf                = {
2510                         .bc_nbufs               = OST_NBUFS,
2511                         .bc_buf_size            = OST_BUFSIZE,
2512                         .bc_req_max_size        = OST_MAXREQSIZE,
2513                         .bc_rep_max_size        = OST_MAXREPSIZE,
2514                         .bc_req_portal          = OST_REQUEST_PORTAL,
2515                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2516                 },
2517                 .psc_thr                = {
2518                         .tc_thr_name            = "ll_ost",
2519                         .tc_thr_factor          = OSS_THR_FACTOR,
2520                         .tc_nthrs_init          = OSS_NTHRS_INIT,
2521                         .tc_nthrs_base          = OSS_NTHRS_BASE,
2522                         .tc_nthrs_max           = OSS_NTHRS_MAX,
2523                         .tc_nthrs_user          = oss_num_threads,
2524                         .tc_cpu_affinity        = 1,
2525                         .tc_ctx_tags            = LCT_DT_THREAD,
2526                 },
2527                 .psc_cpt                = {
2528                         .cc_pattern             = oss_cpts,
2529                 },
2530                 .psc_ops                = {
2531                         .so_req_handler         = ost_handle,
2532                         .so_req_printer         = target_print_req,
2533                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
2534                 },
2535         };
2536         ost->ost_service = ptlrpc_register_service(&svc_conf,
2537                                                    obd->obd_proc_entry);
2538         if (IS_ERR(ost->ost_service)) {
2539                 rc = PTR_ERR(ost->ost_service);
2540                 CERROR("failed to start service: %d\n", rc);
2541                 GOTO(out_lprocfs, rc);
2542         }
2543
2544         memset(&svc_conf, 0, sizeof(svc_conf));
2545         svc_conf = (typeof(svc_conf)) {
2546                 .psc_name               = "ost_create",
2547                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2548                 .psc_buf                = {
2549                         .bc_nbufs               = OST_NBUFS,
2550                         .bc_buf_size            = OST_BUFSIZE,
2551                         .bc_req_max_size        = OST_MAXREQSIZE,
2552                         .bc_rep_max_size        = OST_MAXREPSIZE,
2553                         .bc_req_portal          = OST_CREATE_PORTAL,
2554                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2555                 },
2556                 .psc_thr                = {
2557                         .tc_thr_name            = "ll_ost_create",
2558                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
2559                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
2560                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
2561                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
2562                         .tc_nthrs_user          = oss_num_create_threads,
2563                         .tc_cpu_affinity        = 1,
2564                         .tc_ctx_tags            = LCT_DT_THREAD,
2565                 },
2566                 .psc_cpt                = {
2567                         .cc_pattern             = oss_cpts,
2568                 },
2569                 .psc_ops                = {
2570                         .so_req_handler         = ost_handle,
2571                         .so_req_printer         = target_print_req,
2572                 },
2573         };
2574         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
2575                                                           obd->obd_proc_entry);
2576         if (IS_ERR(ost->ost_create_service)) {
2577                 rc = PTR_ERR(ost->ost_create_service);
2578                 CERROR("failed to start OST create service: %d\n", rc);
2579                 GOTO(out_service, rc);
2580         }
2581
2582         mask = cfs_cpt_table->ctb_nodemask;
2583         /* event CPT feature is disabled in libcfs level by set partition
2584          * number to 1, we still want to set node affinity for io service */
2585         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
2586                 int     cpt = 0;
2587                 int     i;
2588
2589                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
2590                 for_each_node_mask(i, *mask) {
2591                         if (ost_io_cptable == NULL) {
2592                                 CWARN("OSS failed to create CPT table\n");
2593                                 break;
2594                         }
2595
2596                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
2597                         if (!rc) {
2598                                 CWARN("OSS Failed to set node %d for"
2599                                       "IO CPT table\n", i);
2600                                 cfs_cpt_table_free(ost_io_cptable);
2601                                 ost_io_cptable = NULL;
2602                                 break;
2603                         }
2604                 }
2605         }
2606
2607         memset(&svc_conf, 0, sizeof(svc_conf));
2608         svc_conf = (typeof(svc_conf)) {
2609                 .psc_name               = "ost_io",
2610                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2611                 .psc_buf                = {
2612                         .bc_nbufs               = OST_NBUFS,
2613                         .bc_buf_size            = OST_IO_BUFSIZE,
2614                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
2615                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
2616                         .bc_req_portal          = OST_IO_PORTAL,
2617                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2618                 },
2619                 .psc_thr                = {
2620                         .tc_thr_name            = "ll_ost_io",
2621                         .tc_thr_factor          = OSS_THR_FACTOR,
2622                         .tc_nthrs_init          = OSS_NTHRS_INIT,
2623                         .tc_nthrs_base          = OSS_NTHRS_BASE,
2624                         .tc_nthrs_max           = OSS_NTHRS_MAX,
2625                         .tc_nthrs_user          = oss_num_threads,
2626                         .tc_cpu_affinity        = 1,
2627                         .tc_ctx_tags            = LCT_DT_THREAD,
2628                 },
2629                 .psc_cpt                = {
2630                         .cc_cptable             = ost_io_cptable,
2631                         .cc_pattern             = ost_io_cptable == NULL ?
2632                                                   oss_io_cpts : NULL,
2633                 },
2634                 .psc_ops                = {
2635                         .so_thr_init            = ost_io_thread_init,
2636                         .so_thr_done            = ost_io_thread_done,
2637                         .so_req_handler         = ost_handle,
2638                         .so_hpreq_handler       = ost_io_hpreq_handler,
2639                         .so_req_printer         = target_print_req,
2640                 },
2641         };
2642         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
2643                                                       obd->obd_proc_entry);
2644         if (IS_ERR(ost->ost_io_service)) {
2645                 rc = PTR_ERR(ost->ost_io_service);
2646                 CERROR("failed to start OST I/O service: %d\n", rc);
2647                 ost->ost_io_service = NULL;
2648                 GOTO(out_create, rc);
2649         }
2650
2651         memset(&svc_conf, 0, sizeof(svc_conf));
2652         svc_conf = (typeof(svc_conf)) {
2653                 .psc_name               = "ost_seq",
2654                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2655                 .psc_buf                = {
2656                         .bc_nbufs               = OST_NBUFS,
2657                         .bc_buf_size            = OST_BUFSIZE,
2658                         .bc_req_max_size        = OST_MAXREQSIZE,
2659                         .bc_rep_max_size        = OST_MAXREPSIZE,
2660                         .bc_req_portal          = SEQ_DATA_PORTAL,
2661                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2662                 },
2663                 .psc_thr                = {
2664                         .tc_thr_name            = "ll_ost_seq",
2665                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
2666                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
2667                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
2668                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
2669                         .tc_nthrs_user          = oss_num_create_threads,
2670                         .tc_cpu_affinity        = 1,
2671                         .tc_ctx_tags            = LCT_DT_THREAD,
2672                 },
2673
2674                 .psc_cpt                = {
2675                         .cc_pattern          = oss_cpts,
2676                 },
2677                 .psc_ops                = {
2678                         .so_req_handler         = ost_handle,
2679                         .so_req_printer         = target_print_req,
2680                         .so_hpreq_handler       = NULL,
2681                 },
2682         };
2683         ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
2684                                                       obd->obd_proc_entry);
2685         if (IS_ERR(ost->ost_seq_service)) {
2686                 rc = PTR_ERR(ost->ost_seq_service);
2687                 CERROR("failed to start OST seq service: %d\n", rc);
2688                 ost->ost_seq_service = NULL;
2689                 GOTO(out_io, rc);
2690         }
2691
2692         ping_evictor_start();
2693
2694         RETURN(0);
2695 out_io:
2696         ptlrpc_unregister_service(ost->ost_io_service);
2697         ost->ost_io_service = NULL;
2698 out_create:
2699         ptlrpc_unregister_service(ost->ost_create_service);
2700         ost->ost_create_service = NULL;
2701 out_service:
2702         ptlrpc_unregister_service(ost->ost_service);
2703         ost->ost_service = NULL;
2704 out_lprocfs:
2705         lprocfs_obd_cleanup(obd);
2706         RETURN(rc);
2707 }
2708
2709 static int ost_cleanup(struct obd_device *obd)
2710 {
2711         struct ost_obd *ost = &obd->u.ost;
2712         int err = 0;
2713         ENTRY;
2714
2715         ping_evictor_stop();
2716
2717         /* there is no recovery for OST OBD, all recovery is controlled by
2718          * obdfilter OBD */
2719         LASSERT(obd->obd_recovering == 0);
2720         mutex_lock(&ost->ost_health_mutex);
2721         ptlrpc_unregister_service(ost->ost_service);
2722         ptlrpc_unregister_service(ost->ost_create_service);
2723         ptlrpc_unregister_service(ost->ost_io_service);
2724         ptlrpc_unregister_service(ost->ost_seq_service);
2725         ost->ost_service = NULL;
2726         ost->ost_create_service = NULL;
2727         ost->ost_io_service = NULL;
2728         ost->ost_seq_service = NULL;
2729
2730         mutex_unlock(&ost->ost_health_mutex);
2731
2732         lprocfs_obd_cleanup(obd);
2733
2734         if (ost_io_cptable != NULL) {
2735                 cfs_cpt_table_free(ost_io_cptable);
2736                 ost_io_cptable = NULL;
2737         }
2738
2739         RETURN(err);
2740 }
2741
2742 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
2743 {
2744         struct ost_obd *ost = &obd->u.ost;
2745         int rc = 0;
2746
2747         mutex_lock(&ost->ost_health_mutex);
2748         rc |= ptlrpc_service_health_check(ost->ost_service);
2749         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2750         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2751         mutex_unlock(&ost->ost_health_mutex);
2752
2753         /*
2754          * health_check to return 0 on healthy
2755          * and 1 on unhealthy.
2756          */
2757         if( rc != 0)
2758                 rc = 1;
2759
2760         return rc;
2761 }
2762
2763 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2764 {
2765         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2766 }
2767
2768 /* use obd ops to offer management infrastructure */
2769 static struct obd_ops ost_obd_ops = {
2770         .o_owner        = THIS_MODULE,
2771         .o_setup        = ost_setup,
2772         .o_cleanup      = ost_cleanup,
2773         .o_health_check = ost_health_check,
2774 };
2775
2776
2777 static int __init ost_init(void)
2778 {
2779         struct lprocfs_static_vars lvars;
2780         int rc;
2781         ENTRY;
2782
2783         ost_page_to_corrupt = cfs_alloc_page(CFS_ALLOC_STD);
2784
2785         lprocfs_ost_init_vars(&lvars);
2786         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2787                                  LUSTRE_OSS_NAME, NULL);
2788
2789         if (ost_num_threads != 0 && oss_num_threads == 0) {
2790                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2791                               "use oss_num_threads instead or unset both for "
2792                               "dynamic thread startup\n");
2793                 oss_num_threads = ost_num_threads;
2794         }
2795
2796         RETURN(rc);
2797 }
2798
2799 static void /*__exit*/ ost_exit(void)
2800 {
2801         if (ost_page_to_corrupt)
2802                 page_cache_release(ost_page_to_corrupt);
2803
2804         class_unregister_type(LUSTRE_OSS_NAME);
2805 }
2806
2807 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2808 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2809 MODULE_LICENSE("GPL");
2810
2811 module_init(ost_init);
2812 module_exit(ost_exit);