Whamcloud - gitweb
0452257ed134fab43c4c025528973e5259242a19
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/ost/ost_handler.c
40  *
41  * Author: Peter J. Braam <braam@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_OST
49
50 #include <linux/module.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <lustre_net.h>
54 #include <lustre_dlm.h>
55 #include <lustre_export.h>
56 #include <lustre_debug.h>
57 #include <linux/init.h>
58 #include <lprocfs_status.h>
59 #include <libcfs/list.h>
60 #include <lustre_quota.h>
61 #include "ost_internal.h"
62
63 static int oss_num_threads;
64 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
65                 "number of OSS service threads to start");
66
67 static int ost_num_threads;
68 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
69                 "number of OST service threads to start (deprecated)");
70
71 static int oss_num_create_threads;
72 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
73                 "number of OSS create threads to start");
74
75 /**
76  * Do not return server-side uid/gid to remote client
77  */
78 static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
79 {
80         if (exp_connect_rmtclient(exp)) {
81                 oa->o_uid = -1;
82                 oa->o_gid = -1;
83                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
84         }
85 }
86
87 /**
88  * Validate oa from client.
89  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
90  * req are valid.
91  *    a. for single MDS  seq = FID_SEQ_OST_MDT0,
92  *    b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
93  */
94 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
95                              struct obd_ioobj *ioobj)
96 {
97         if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) {
98                 oa->o_seq = FID_SEQ_OST_MDT0;
99                 if (ioobj)
100                         ioobj->ioo_seq = FID_SEQ_OST_MDT0;
101         /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
102         } else if (oa == NULL ||
103                    !(fid_seq_is_rsvd(oa->o_seq) || fid_seq_is_idif(oa->o_seq))) {
104                 CERROR("%s: client %s sent invalid object "POSTID"\n",
105                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
106                        oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
107                 return -EPROTO;
108         }
109         obdo_from_ostid(oa, &oa->o_oi);
110         if (ioobj)
111                 ioobj_from_obdo(ioobj, oa);
112         return 0;
113 }
114
115 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
116 {
117         struct oti_req_ack_lock *ack_lock;
118         int i;
119
120         if (oti == NULL)
121                 return;
122
123         if (req->rq_repmsg) {
124                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
125                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
126                 versions[0] = oti->oti_pre_version;
127                 lustre_msg_set_versions(req->rq_repmsg, versions);
128         }
129         req->rq_transno = oti->oti_transno;
130
131         /* XXX 4 == entries in oti_ack_locks??? */
132         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
133                 if (!ack_lock->mode)
134                         break;
135                 /* XXX not even calling target_send_reply in some cases... */
136                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
137         }
138 }
139
140 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
141                        struct obd_trans_info *oti)
142 {
143         struct ost_body *body, *repbody;
144         struct lustre_capa *capa = NULL;
145         int rc;
146         ENTRY;
147
148         /* Get the request body */
149         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
150         if (body == NULL)
151                 RETURN(-EFAULT);
152
153         if (body->oa.o_id == 0)
154                 RETURN(-EPROTO);
155
156         rc = ost_validate_obdo(exp, &body->oa, NULL);
157         if (rc)
158                 RETURN(rc);
159
160         /* If there's a DLM request, cancel the locks mentioned in it*/
161         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
162                 struct ldlm_request *dlm;
163
164                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
165                 if (dlm == NULL)
166                         RETURN (-EFAULT);
167                 ldlm_request_cancel(req, dlm, 0);
168         }
169
170         /* If there's a capability, get it */
171         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
172                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173                 if (capa == NULL) {
174                         CERROR("Missing capability for OST DESTROY");
175                         RETURN (-EFAULT);
176                 }
177         }
178
179         /* Prepare the reply */
180         rc = req_capsule_server_pack(&req->rq_pill);
181         if (rc)
182                 RETURN(rc);
183
184         /* Get the log cancellation cookie */
185         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
186                 oti->oti_logcookies = &body->oa.o_lcookie;
187
188         /* Finish the reply */
189         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
190         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
191
192         /* Do the destroy and set the reply status accordingly  */
193         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
194         RETURN(0);
195 }
196
197 /**
198  * Helper function for getting server side [start, start+count] DLM lock
199  * if asked by client.
200  */
201 static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
202                         __u64 start, __u64 count, struct lustre_handle *lh,
203                         int mode, int flags)
204 {
205         struct ldlm_res_id res_id;
206         ldlm_policy_data_t policy;
207         __u64 end = start + count;
208
209         ENTRY;
210
211         LASSERT(!lustre_handle_is_used(lh));
212         /* o_id and o_gr are used for localizing resource, if client miss to set
213          * them, do not trigger ASSERTION. */
214         if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
215                      (OBD_MD_FLID | OBD_MD_FLGROUP)))
216                 RETURN(-EPROTO);
217
218         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
219             !(oa->o_flags & OBD_FL_SRVLOCK))
220                 RETURN(0);
221
222         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
223         CDEBUG(D_INODE, "OST-side extent lock.\n");
224
225         policy.l_extent.start = start & CFS_PAGE_MASK;
226
227         /* If ->o_blocks is EOF it means "lock till the end of the
228          * file". Otherwise, it's size of a hole being punched (in bytes) */
229         if (count == OBD_OBJECT_EOF || end < start)
230                 policy.l_extent.end = OBD_OBJECT_EOF;
231         else
232                 policy.l_extent.end = end | ~CFS_PAGE_MASK;
233
234         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
235                                       LDLM_EXTENT, &policy, mode, &flags,
236                                       ldlm_blocking_ast, ldlm_completion_ast,
237                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
238 }
239
240 /* Helper function: release lock, if any. */
241 static void ost_lock_put(struct obd_export *exp,
242                          struct lustre_handle *lh, int mode)
243 {
244         ENTRY;
245         if (lustre_handle_is_used(lh))
246                 ldlm_lock_decref(lh, mode);
247         EXIT;
248 }
249
250 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
251 {
252         struct ost_body *body, *repbody;
253         struct obd_info *oinfo;
254         struct lustre_handle lh = { 0 };
255         struct lustre_capa *capa = NULL;
256         int rc;
257         ENTRY;
258
259         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
260         if (body == NULL)
261                 RETURN(-EFAULT);
262
263         rc = ost_validate_obdo(exp, &body->oa, NULL);
264         if (rc)
265                 RETURN(rc);
266
267         rc = req_capsule_server_pack(&req->rq_pill);
268         if (rc)
269                 RETURN(rc);
270
271         rc = ost_lock_get(exp, &body->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
272         if (rc)
273                 RETURN(rc);
274
275         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
276                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
277                 if (capa == NULL) {
278                         CERROR("Missing capability for OST GETATTR");
279                         GOTO(unlock, rc = -EFAULT);
280                 }
281         }
282
283         OBD_ALLOC_PTR(oinfo);
284         if (!oinfo)
285                 GOTO(unlock, rc = -ENOMEM);
286         oinfo->oi_oa = &body->oa;
287         oinfo->oi_capa = capa;
288
289         req->rq_status = obd_getattr(exp, oinfo);
290
291         OBD_FREE_PTR(oinfo);
292
293         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         repbody->oa = body->oa;
295         ost_drop_id(exp, &repbody->oa);
296
297 unlock:
298         ost_lock_put(exp, &lh, LCK_PR);
299
300         RETURN(0);
301 }
302
303 static int ost_statfs(struct ptlrpc_request *req)
304 {
305         struct obd_statfs *osfs;
306         int rc;
307         ENTRY;
308
309         rc = req_capsule_server_pack(&req->rq_pill);
310         if (rc)
311                 RETURN(rc);
312
313         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
314
315         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
316                                     cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
317                                     0);
318         if (req->rq_status != 0)
319                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
320
321         RETURN(0);
322 }
323
324 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
325                       struct obd_trans_info *oti)
326 {
327         struct ost_body *body, *repbody;
328         int rc;
329         ENTRY;
330
331         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
332         if (body == NULL)
333                 RETURN(-EFAULT);
334
335         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
336         if (rc)
337                 RETURN(rc);
338
339         rc = req_capsule_server_pack(&req->rq_pill);
340         if (rc)
341                 RETURN(rc);
342
343         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344         repbody->oa = body->oa;
345         oti->oti_logcookies = &body->oa.o_lcookie;
346
347         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
348         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
349         RETURN(0);
350 }
351
352 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
353                      struct obd_trans_info *oti)
354 {
355         struct ost_body *body, *repbody;
356         int rc, flags = 0;
357         struct lustre_handle lh = {0,};
358         ENTRY;
359
360         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
361         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
362
363         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
364         if (body == NULL)
365                 RETURN(-EFAULT);
366
367         rc = ost_validate_obdo(exp, &body->oa, NULL);
368         if (rc)
369                 RETURN(rc);
370
371         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
372             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
373                 RETURN(-EPROTO);
374
375         rc = req_capsule_server_pack(&req->rq_pill);
376         if (rc)
377                 RETURN(rc);
378
379         /* standard truncate optimization: if file body is completely
380          * destroyed, don't send data back to the server. */
381         if (body->oa.o_size == 0)
382                 flags |= LDLM_AST_DISCARD_DATA;
383
384         rc = ost_lock_get(exp, &body->oa, body->oa.o_size, body->oa.o_blocks,
385                           &lh, LCK_PW, flags);
386         if (rc == 0) {
387                 struct obd_info *oinfo;
388                 struct lustre_capa *capa = NULL;
389
390                 if (body->oa.o_valid & OBD_MD_FLFLAGS &&
391                     body->oa.o_flags == OBD_FL_SRVLOCK)
392                         /*
393                          * If OBD_FL_SRVLOCK is the only bit set in
394                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
395                          * through filter_setattr() to filter_iocontrol().
396                          */
397                         body->oa.o_valid &= ~OBD_MD_FLFLAGS;
398
399                 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
400                         capa = req_capsule_client_get(&req->rq_pill,
401                                                       &RMF_CAPA1);
402                         if (capa == NULL) {
403                                 CERROR("Missing capability for OST PUNCH");
404                                 GOTO(unlock, rc = -EFAULT);
405                         }
406                 }
407
408                 OBD_ALLOC_PTR(oinfo);
409                 if (!oinfo)
410                         GOTO(unlock, rc = -ENOMEM);
411                 oinfo->oi_oa = &body->oa;
412                 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
413                 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
414                 oinfo->oi_capa = capa;
415
416                 req->rq_status = obd_punch(exp, oinfo, oti, NULL);
417                 OBD_FREE_PTR(oinfo);
418 unlock:
419                 ost_lock_put(exp, &lh, LCK_PW);
420         }
421
422         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
423         repbody->oa = body->oa;
424         ost_drop_id(exp, &repbody->oa);
425         RETURN(rc);
426 }
427
428 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
429 {
430         struct ost_body *body, *repbody;
431         struct obd_info *oinfo;
432         struct lustre_capa *capa = NULL;
433         int rc;
434         ENTRY;
435
436         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
437         if (body == NULL)
438                 RETURN(-EFAULT);
439
440         rc = ost_validate_obdo(exp, &body->oa, NULL);
441         if (rc)
442                 RETURN(rc);
443
444         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
445                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
446                 if (capa == NULL) {
447                         CERROR("Missing capability for OST SYNC");
448                         RETURN (-EFAULT);
449                 }
450         }
451
452         rc = req_capsule_server_pack(&req->rq_pill);
453         if (rc)
454                 RETURN(rc);
455
456         OBD_ALLOC_PTR(oinfo);
457         if (!oinfo)
458                 RETURN(-ENOMEM);
459
460         oinfo->oi_oa = &body->oa;
461         oinfo->oi_capa = capa;
462         req->rq_status = obd_sync(exp, oinfo, body->oa.o_size,
463                                   body->oa.o_blocks, NULL);
464         OBD_FREE_PTR(oinfo);
465
466         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
467         repbody->oa = body->oa;
468         ost_drop_id(exp, &repbody->oa);
469         RETURN(0);
470 }
471
472 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
473                        struct obd_trans_info *oti)
474 {
475         struct ost_body *body, *repbody;
476         struct obd_info *oinfo;
477         struct lustre_capa *capa = NULL;
478         int rc;
479         ENTRY;
480
481         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
482         if (body == NULL)
483                 RETURN(-EFAULT);
484
485         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
486         if (rc)
487                 RETURN(rc);
488
489         rc = req_capsule_server_pack(&req->rq_pill);
490         if (rc)
491                 RETURN(rc);
492
493         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
494                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
495                 if (capa == NULL) {
496                         CERROR("Missing capability for OST SETATTR");
497                         RETURN (-EFAULT);
498                 }
499         }
500
501         OBD_ALLOC_PTR(oinfo);
502         if (!oinfo)
503                 RETURN(-ENOMEM);
504         oinfo->oi_oa = &body->oa;
505         oinfo->oi_capa = capa;
506
507         req->rq_status = obd_setattr(exp, oinfo, oti);
508
509         OBD_FREE_PTR(oinfo);
510
511         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
512         repbody->oa = body->oa;
513         ost_drop_id(exp, &repbody->oa);
514         RETURN(0);
515 }
516
517 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
518                                cksum_type_t cksum_type)
519 {
520         __u32 cksum;
521         int i;
522
523         cksum = init_checksum(cksum_type);
524         for (i = 0; i < desc->bd_iov_count; i++) {
525                 struct page *page = desc->bd_iov[i].kiov_page;
526                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
527                 char *ptr = kmap(page) + off;
528                 int len = desc->bd_iov[i].kiov_len;
529
530                 /* corrupt the data before we compute the checksum, to
531                  * simulate a client->OST data error */
532                 if (i == 0 && opc == OST_WRITE &&
533                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE))
534                         memcpy(ptr, "bad3", min(4, len));
535                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
536                 /* corrupt the data after we compute the checksum, to
537                  * simulate an OST->client data error */
538                 if (i == 0 && opc == OST_READ &&
539                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
540                         memcpy(ptr, "bad4", min(4, len));
541                         /* nobody should use corrupted page again */
542                         ClearPageUptodate(page);
543                 }
544                 kunmap(page);
545         }
546
547         return cksum;
548 }
549
550 static int ost_brw_lock_get(int mode, struct obd_export *exp,
551                             struct obd_ioobj *obj, struct niobuf_remote *nb,
552                             struct lustre_handle *lh)
553 {
554         int flags                 = 0;
555         int nrbufs                = obj->ioo_bufcnt;
556         struct ldlm_res_id res_id;
557         ldlm_policy_data_t policy;
558         int i;
559         ENTRY;
560
561         osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
562         LASSERT(mode == LCK_PR || mode == LCK_PW);
563         LASSERT(!lustre_handle_is_used(lh));
564
565         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
566                 RETURN(0);
567
568         for (i = 1; i < nrbufs; i ++)
569                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
570                     (nb[i].flags & OBD_BRW_SRVLOCK))
571                         RETURN(-EFAULT);
572
573         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
574         policy.l_extent.end   = (nb[nrbufs - 1].offset +
575                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
576
577         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
578                                       LDLM_EXTENT, &policy, mode, &flags,
579                                       ldlm_blocking_ast, ldlm_completion_ast,
580                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
581 }
582
583 static void ost_brw_lock_put(int mode,
584                              struct obd_ioobj *obj, struct niobuf_remote *niob,
585                              struct lustre_handle *lh)
586 {
587         ENTRY;
588         LASSERT(mode == LCK_PR || mode == LCK_PW);
589         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
590                 lustre_handle_is_used(lh));
591         if (lustre_handle_is_used(lh))
592                 ldlm_lock_decref(lh, mode);
593         EXIT;
594 }
595
596 struct ost_prolong_data {
597         struct obd_export *opd_exp;
598         ldlm_policy_data_t opd_policy;
599         struct obdo *opd_oa;
600         ldlm_mode_t opd_mode;
601         int opd_lock_match;
602         int opd_timeout;
603 };
604
605 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
606 {
607         struct ost_prolong_data *opd = data;
608
609         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
610
611         if (lock->l_req_mode != lock->l_granted_mode) {
612                 /* scan granted locks only */
613                 return LDLM_ITER_STOP;
614         }
615
616         if (lock->l_export != opd->opd_exp) {
617                 /* prolong locks only for given client */
618                 return LDLM_ITER_CONTINUE;
619         }
620
621         if (!(lock->l_granted_mode & opd->opd_mode)) {
622                 /* we aren't interesting in all type of locks */
623                 return LDLM_ITER_CONTINUE;
624         }
625
626         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
627             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
628                 /* the request doesn't cross the lock, skip it */
629                 return LDLM_ITER_CONTINUE;
630         }
631
632         /* Fill the obdo with the matched lock handle.
633          * XXX: it is possible in some cases the IO RPC is covered by several
634          * locks, even for the write case, so it may need to be a lock list. */
635         if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
636                 opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
637                 opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
638         }
639
640         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
641                 /* ignore locks not being cancelled */
642                 return LDLM_ITER_CONTINUE;
643         }
644
645         CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
646                lock->l_resource->lr_name.name[0],
647                lock->l_resource->lr_name.name[1],
648                opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
649         /* OK. this is a possible lock the user holds doing I/O
650          * let's refresh eviction timer for it */
651         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
652         opd->opd_lock_match = 1;
653
654         return LDLM_ITER_CONTINUE;
655 }
656
657 static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
658                                 struct niobuf_remote *nb, struct obdo *oa,
659                                 ldlm_mode_t mode)
660 {
661         struct ldlm_res_id res_id;
662         int nrbufs = obj->ioo_bufcnt;
663         struct ost_prolong_data opd = { 0 };
664         ENTRY;
665
666         osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
667
668         opd.opd_mode = mode;
669         opd.opd_exp = req->rq_export;
670         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
671         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
672                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
673
674         /* prolong locks for the current service time of the corresponding
675          * portal (= OST_IO_PORTAL) */
676         opd.opd_timeout = AT_OFF ? obd_timeout / 2:
677                           max(at_est2timeout(at_get(&req->rq_rqbd->
678                               rqbd_service->srv_at_estimate)), ldlm_timeout);
679
680         CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
681                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
682                opd.opd_policy.l_extent.end);
683
684         if (oa->o_valid & OBD_MD_FLHANDLE) {
685                 struct ldlm_lock *lock;
686
687                 lock = ldlm_handle2lock(&oa->o_handle);
688                 if (lock != NULL) {
689                         ost_prolong_locks_iter(lock, &opd);
690                         if (opd.opd_lock_match) {
691                                 LDLM_LOCK_PUT(lock);
692                                 RETURN(1);
693                         }
694
695                         /* Check if the lock covers the whole IO region,
696                          * otherwise iterate through the resource. */
697                         if (lock->l_policy_data.l_extent.end >=
698                             opd.opd_policy.l_extent.end &&
699                             lock->l_policy_data.l_extent.start <=
700                             opd.opd_policy.l_extent.start) {
701                                 LDLM_LOCK_PUT(lock);
702                                 RETURN(0);
703                         }
704                         LDLM_LOCK_PUT(lock);
705                 }
706         }
707
708         opd.opd_oa = oa;
709         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
710                               ost_prolong_locks_iter, &opd);
711         RETURN(opd.opd_lock_match);
712 }
713
714 /* Allocate thread local buffers if needed */
715 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
716 {
717         struct ost_thread_local_cache *tls =
718                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
719
720         /* In normal mode of operation an I/O request is serviced only
721          * by ll_ost_io threads each of them has own tls buffers allocated by
722          * ost_thread_init().
723          * During recovery, an I/O request may be queued until any of the ost
724          * service threads process it. Not necessary it should be one of
725          * ll_ost_io threads. In that case we dynamically allocating tls
726          * buffers for the request service time. */
727         if (unlikely(tls == NULL)) {
728                 LASSERT(r->rq_export->exp_in_recovery);
729                 OBD_ALLOC_PTR(tls);
730                 if (tls != NULL) {
731                         tls->temporary = 1;
732                         r->rq_svc_thread->t_data = tls;
733                 }
734         }
735         return  tls;
736 }
737
738 /* Free thread local buffers if they were allocated only for servicing
739  * this one request */
740 static void ost_tls_put(struct ptlrpc_request *r)
741 {
742         struct ost_thread_local_cache *tls =
743                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
744
745         if (unlikely(tls->temporary)) {
746                 OBD_FREE_PTR(tls);
747                 r->rq_svc_thread->t_data = NULL;
748         }
749 }
750
751 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
752 {
753         struct ptlrpc_bulk_desc *desc = NULL;
754         struct obd_export *exp = req->rq_export;
755         struct niobuf_remote *remote_nb;
756         struct niobuf_local *local_nb;
757         struct obd_ioobj *ioo;
758         struct ost_body *body, *repbody;
759         struct lustre_capa *capa = NULL;
760         struct l_wait_info lwi;
761         struct lustre_handle lockh = { 0 };
762         int niocount, npages, nob = 0, rc, i;
763         int no_reply = 0;
764         struct ost_thread_local_cache *tls;
765         ENTRY;
766
767         req->rq_bulk_read = 1;
768
769         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
770                 GOTO(out, rc = -EIO);
771
772         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
773
774         /* Check if there is eviction in progress, and if so, wait for it to
775          * finish */
776         if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
777                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
778                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
779                         !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
780                         &lwi);
781         }
782         if (exp->exp_failed)
783                 GOTO(out, rc = -ENOTCONN);
784
785         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
786          * ost_rw_hpreq_check(). */
787         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
788         if (body == NULL)
789                 GOTO(out, rc = -EFAULT);
790
791         /*
792          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
793          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
794          * &RMF_NIOBUF_REMOTE.
795          */
796         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
797         if (ioo == NULL)
798                 GOTO(out, rc = -EFAULT);
799
800         rc = ost_validate_obdo(exp, &body->oa, ioo);
801         if (rc)
802                 RETURN(rc);
803
804         niocount = ioo->ioo_bufcnt;
805         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
806         if (remote_nb == NULL)
807                 GOTO(out, rc = -EFAULT);
808
809         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
810                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
811                 if (capa == NULL) {
812                         CERROR("Missing capability for OST BRW READ");
813                         GOTO(out, rc = -EFAULT);
814                 }
815         }
816
817         rc = req_capsule_server_pack(&req->rq_pill);
818         if (rc)
819                 GOTO(out, rc);
820
821         tls = ost_tls_get(req);
822         if (tls == NULL)
823                 GOTO(out_bulk, rc = -ENOMEM);
824         local_nb = tls->local;
825
826         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
827         if (rc != 0)
828                 GOTO(out_tls, rc);
829
830         /*
831          * If getting the lock took more time than
832          * client was willing to wait, drop it. b=11330
833          */
834         if (cfs_time_current_sec() > req->rq_deadline ||
835             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
836                 no_reply = 1;
837                 CERROR("Dropping timed-out read from %s because locking"
838                        "object "LPX64" took %ld seconds (limit was %ld).\n",
839                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
840                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
841                        req->rq_deadline - req->rq_arrival_time.tv_sec);
842                 GOTO(out_lock, rc = -ETIMEDOUT);
843         }
844
845         npages = OST_THREAD_POOL_SIZE;
846         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
847                         remote_nb, &npages, local_nb, oti, capa);
848         if (rc != 0)
849                 GOTO(out_lock, rc);
850
851         desc = ptlrpc_prep_bulk_exp(req, npages,
852                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
853         if (desc == NULL)
854                 GOTO(out_commitrw, rc = -ENOMEM);
855
856         if (!lustre_handle_is_used(&lockh))
857                 /* no needs to try to prolong lock if server is asked
858                  * to handle locking (= OBD_BRW_SRVLOCK) */
859                 ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa,
860                                      LCK_PW | LCK_PR);
861
862         nob = 0;
863         for (i = 0; i < npages; i++) {
864                 int page_rc = local_nb[i].rc;
865
866                 if (page_rc < 0) {              /* error */
867                         rc = page_rc;
868                         break;
869                 }
870
871                 nob += page_rc;
872                 if (page_rc != 0) {             /* some data! */
873                         LASSERT (local_nb[i].page != NULL);
874                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
875                                               local_nb[i].offset & ~CFS_PAGE_MASK,
876                                               page_rc);
877                 }
878
879                 if (page_rc != local_nb[i].len) { /* short read */
880                         /* All subsequent pages should be 0 */
881                         while(++i < npages)
882                                 LASSERT(local_nb[i].rc == 0);
883                         break;
884                 }
885         }
886
887         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
888                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
889
890                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
891                         cksum_type = cksum_type_unpack(body->oa.o_flags);
892                 body->oa.o_flags = cksum_type_pack(cksum_type);
893                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
894                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
895                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
896         } else {
897                 body->oa.o_valid = 0;
898         }
899         /* We're finishing using body->oa as an input variable */
900
901         /* Check if client was evicted while we were doing i/o before touching
902            network */
903         if (rc == 0) {
904                 rc = target_bulk_io(exp, desc, &lwi);
905                 no_reply = rc != 0;
906         }
907
908 out_commitrw:
909         /* Must commit after prep above in all cases */
910         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
911                           remote_nb, npages, local_nb, oti, rc);
912
913         if (rc == 0) {
914                 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
915                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
916                 ost_drop_id(exp, &repbody->oa);
917         }
918
919 out_lock:
920         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
921 out_tls:
922         ost_tls_put(req);
923 out_bulk:
924         if (desc)
925                 ptlrpc_free_bulk(desc);
926 out:
927         LASSERT(rc <= 0);
928         if (rc == 0) {
929                 req->rq_status = nob;
930                 ptlrpc_lprocfs_brw(req, nob);
931                 target_committed_to_req(req);
932                 ptlrpc_reply(req);
933         } else if (!no_reply) {
934                 /* Only reply if there was no comms problem with bulk */
935                 target_committed_to_req(req);
936                 req->rq_status = rc;
937                 ptlrpc_error(req);
938         } else {
939                 /* reply out callback would free */
940                 ptlrpc_req_drop_rs(req);
941                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
942                       "client will retry\n",
943                       exp->exp_obd->obd_name,
944                       exp->exp_client_uuid.uuid,
945                       exp->exp_connection->c_remote_uuid.uuid,
946                       libcfs_id2str(req->rq_peer));
947         }
948
949         RETURN(rc);
950 }
951
952 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
953 {
954         struct ptlrpc_bulk_desc *desc = NULL;
955         struct obd_export       *exp = req->rq_export;
956         struct niobuf_remote    *remote_nb;
957         struct niobuf_local     *local_nb;
958         struct obd_ioobj        *ioo;
959         struct ost_body         *body, *repbody;
960         struct l_wait_info       lwi;
961         struct lustre_handle     lockh = {0};
962         struct lustre_capa      *capa = NULL;
963         __u32                   *rcs;
964         int objcount, niocount, npages;
965         int rc, i, j;
966         obd_count                client_cksum = 0, server_cksum = 0;
967         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
968         int                      no_reply = 0, mmap = 0;
969         __u32                    o_uid = 0, o_gid = 0;
970         struct ost_thread_local_cache *tls;
971         ENTRY;
972
973         req->rq_bulk_write = 1;
974
975         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
976                 GOTO(out, rc = -EIO);
977         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
978                 GOTO(out, rc = -EFAULT);
979
980         /* pause before transaction has been started */
981         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
982
983         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
984          * ost_rw_hpreq_check(). */
985         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
986         if (body == NULL)
987                 GOTO(out, rc = -EFAULT);
988
989         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
990                                         RCL_CLIENT) / sizeof(*ioo);
991         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
992         if (ioo == NULL)
993                 GOTO(out, rc = -EFAULT);
994
995         rc = ost_validate_obdo(exp, &body->oa, ioo);
996         if (rc)
997                 RETURN(rc);
998
999         for (niocount = i = 0; i < objcount; i++)
1000                 niocount += ioo[i].ioo_bufcnt;
1001
1002         /*
1003          * It'd be nice to have a capsule function to indicate how many elements
1004          * there were in a buffer for an RMF that's declared to be an array.
1005          * It's easy enough to compute the number of elements here though.
1006          */
1007         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1008         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
1009             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
1010                 GOTO(out, rc = -EFAULT);
1011
1012         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
1013             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
1014                 cfs_memory_pressure_set();
1015
1016         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
1017                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
1018                 if (capa == NULL) {
1019                         CERROR("Missing capability for OST BRW WRITE");
1020                         GOTO(out, rc = -EFAULT);
1021                 }
1022         }
1023
1024         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
1025                              niocount * sizeof(*rcs));
1026         rc = req_capsule_server_pack(&req->rq_pill);
1027         if (rc != 0)
1028                 GOTO(out, rc);
1029         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
1030         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
1031
1032         tls = ost_tls_get(req);
1033         if (tls == NULL)
1034                 GOTO(out_bulk, rc = -ENOMEM);
1035         local_nb = tls->local;
1036
1037         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
1038         if (rc != 0)
1039                 GOTO(out_tls, rc);
1040
1041         /*
1042          * If getting the lock took more time than
1043          * client was willing to wait, drop it. b=11330
1044          */
1045         if (cfs_time_current_sec() > req->rq_deadline ||
1046             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1047                 no_reply = 1;
1048                 CERROR("Dropping timed-out write from %s because locking "
1049                        "object "LPX64" took %ld seconds (limit was %ld).\n",
1050                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
1051                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
1052                        req->rq_deadline - req->rq_arrival_time.tv_sec);
1053                 GOTO(out_lock, rc = -ETIMEDOUT);
1054         }
1055
1056         if (!lustre_handle_is_used(&lockh))
1057                 /* no needs to try to prolong lock if server is asked
1058                  * to handle locking (= OBD_BRW_SRVLOCK) */
1059                 ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
1060
1061         /* obd_preprw clobbers oa->valid, so save what we need */
1062         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1063                 client_cksum = body->oa.o_cksum;
1064                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1065                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1066         }
1067         if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
1068                 mmap = 1;
1069
1070         /* Because we already sync grant info with client when reconnect,
1071          * grant info will be cleared for resent req, then fed_grant and
1072          * total_grant will not be modified in following preprw_write */
1073         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1074                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1075                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1076         }
1077
1078         if (exp_connect_rmtclient(exp)) {
1079                 o_uid = body->oa.o_uid;
1080                 o_gid = body->oa.o_gid;
1081         }
1082         npages = OST_THREAD_POOL_SIZE;
1083         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
1084                         ioo, remote_nb, &npages, local_nb, oti, capa);
1085         if (rc != 0)
1086                 GOTO(out_lock, rc);
1087
1088         desc = ptlrpc_prep_bulk_exp(req, npages,
1089                                      BULK_GET_SINK, OST_BULK_PORTAL);
1090         if (desc == NULL)
1091                 GOTO(skip_transfer, rc = -ENOMEM);
1092
1093         /* NB Having prepped, we must commit... */
1094
1095         for (i = 0; i < npages; i++)
1096                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1097                                       local_nb[i].offset & ~CFS_PAGE_MASK,
1098                                       local_nb[i].len);
1099
1100         rc = sptlrpc_svc_prep_bulk(req, desc);
1101         if (rc != 0)
1102                 GOTO(out_lock, rc);
1103
1104         rc = target_bulk_io(exp, desc, &lwi);
1105         no_reply = rc != 0;
1106
1107 skip_transfer:
1108         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1109         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1110
1111         if (unlikely(client_cksum != 0 && rc == 0)) {
1112                 static int cksum_counter;
1113                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1114                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1115                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1116                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1117                 repbody->oa.o_cksum = server_cksum;
1118                 cksum_counter++;
1119                 if (unlikely(client_cksum != server_cksum)) {
1120                         CDEBUG_LIMIT(mmap ? D_INFO : D_ERROR,
1121                                      "client csum %x, server csum %x\n",
1122                                      client_cksum, server_cksum);
1123                         cksum_counter = 0;
1124                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1125                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1126                                cksum_counter, libcfs_id2str(req->rq_peer),
1127                                server_cksum);
1128                 }
1129         }
1130
1131         /* Must commit after prep above in all cases */
1132         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1133                           remote_nb, npages, local_nb, oti, rc);
1134         if (rc == -ENOTCONN)
1135                 /* quota acquire process has been given up because
1136                  * either the client has been evicted or the client
1137                  * has timed out the request already */
1138                 no_reply = 1;
1139
1140         if (exp_connect_rmtclient(exp)) {
1141                 repbody->oa.o_uid = o_uid;
1142                 repbody->oa.o_gid = o_gid;
1143         }
1144
1145         /*
1146          * Disable sending mtime back to the client. If the client locked the
1147          * whole object, then it has already updated the mtime on its side,
1148          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1149          */
1150         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1151
1152         if (unlikely(client_cksum != server_cksum && rc == 0 &&  !mmap)) {
1153                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1154                 char *msg;
1155                 char *via;
1156                 char *router;
1157
1158                 if (new_cksum == server_cksum)
1159                         msg = "changed in transit before arrival at OST";
1160                 else if (new_cksum == client_cksum)
1161                         msg = "initial checksum before message complete";
1162                 else
1163                         msg = "changed in transit AND after initial checksum";
1164
1165                 if (req->rq_peer.nid == desc->bd_sender) {
1166                         via = router = "";
1167                 } else {
1168                         via = " via ";
1169                         router = libcfs_nid2str(desc->bd_sender);
1170                 }
1171
1172                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1173                                    "%s%s%s inode "DFID" object "
1174                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1175                                    exp->exp_obd->obd_name, msg,
1176                                    libcfs_id2str(req->rq_peer),
1177                                    via, router,
1178                                    body->oa.o_valid & OBD_MD_FLFID ?
1179                                                 body->oa.o_parent_seq : (__u64)0,
1180                                    body->oa.o_valid & OBD_MD_FLFID ?
1181                                                 body->oa.o_parent_oid : 0,
1182                                    body->oa.o_valid & OBD_MD_FLFID ?
1183                                                 body->oa.o_parent_ver : 0,
1184                                    body->oa.o_id,
1185                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1186                                                 body->oa.o_seq : (__u64)0,
1187                                    local_nb[0].offset,
1188                                    local_nb[npages-1].offset +
1189                                    local_nb[npages-1].len - 1 );
1190                 CERROR("client csum %x, original server csum %x, "
1191                        "server csum now %x\n",
1192                        client_cksum, server_cksum, new_cksum);
1193         }
1194
1195         if (rc == 0) {
1196                 int nob = 0;
1197
1198                 /* set per-requested niobuf return codes */
1199                 for (i = j = 0; i < niocount; i++) {
1200                         int len = remote_nb[i].len;
1201
1202                         nob += len;
1203                         rcs[i] = 0;
1204                         do {
1205                                 LASSERT(j < npages);
1206                                 if (local_nb[j].rc < 0)
1207                                         rcs[i] = local_nb[j].rc;
1208                                 len -= local_nb[j].len;
1209                                 j++;
1210                         } while (len > 0);
1211                         LASSERT(len == 0);
1212                 }
1213                 LASSERT(j == npages);
1214                 ptlrpc_lprocfs_brw(req, nob);
1215         }
1216
1217 out_lock:
1218         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1219 out_tls:
1220         ost_tls_put(req);
1221 out_bulk:
1222         if (desc)
1223                 ptlrpc_free_bulk(desc);
1224 out:
1225         if (rc == 0) {
1226                 oti_to_request(oti, req);
1227                 target_committed_to_req(req);
1228                 rc = ptlrpc_reply(req);
1229         } else if (!no_reply) {
1230                 /* Only reply if there was no comms problem with bulk */
1231                 target_committed_to_req(req);
1232                 req->rq_status = rc;
1233                 ptlrpc_error(req);
1234         } else {
1235                 /* reply out callback would free */
1236                 ptlrpc_req_drop_rs(req);
1237                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1238                       "client will retry\n",
1239                       exp->exp_obd->obd_name,
1240                       exp->exp_client_uuid.uuid,
1241                       exp->exp_connection->c_remote_uuid.uuid,
1242                       libcfs_id2str(req->rq_peer));
1243         }
1244         cfs_memory_pressure_clr();
1245         RETURN(rc);
1246 }
1247
1248 /**
1249  * Implementation of OST_SET_INFO.
1250  *
1251  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1252  * "key" and a value RPC buffers as arguments, with the value's contents
1253  * interpreted according to the key.
1254  *
1255  * Value types that need swabbing have swabbing done explicitly, either here or
1256  * in functions called from here.  This should be corrected: all swabbing should
1257  * be done in the capsule abstraction, as that will then allow us to move
1258  * swabbing exclusively to the client without having to modify server code
1259  * outside the capsule abstraction's implementation itself.  To correct this
1260  * will require minor changes to the capsule abstraction; see the comments for
1261  * req_capsule_extend() in layout.c.
1262  */
1263 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1264 {
1265         struct ost_body *body = NULL, *repbody;
1266         char *key, *val = NULL;
1267         int keylen, vallen, rc = 0;
1268         int is_grant_shrink = 0;
1269         ENTRY;
1270
1271         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1272         if (key == NULL) {
1273                 DEBUG_REQ(D_HA, req, "no set_info key");
1274                 RETURN(-EFAULT);
1275         }
1276         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1277                                       RCL_CLIENT);
1278
1279         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1280                                       RCL_CLIENT);
1281
1282         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1283                 /* In this case the value is actually an RMF_OST_BODY, so we
1284                  * transmutate the type of this PTLRPC */
1285                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1286
1287         rc = req_capsule_server_pack(&req->rq_pill);
1288         if (rc)
1289                 RETURN(rc);
1290
1291         if (vallen) {
1292                 if (is_grant_shrink) {
1293                         body = req_capsule_client_get(&req->rq_pill,
1294                                                       &RMF_OST_BODY);
1295                         if (!body)
1296                                 RETURN(-EFAULT);
1297
1298                         repbody = req_capsule_server_get(&req->rq_pill,
1299                                                          &RMF_OST_BODY);
1300                         memcpy(repbody, body, sizeof(*body));
1301                         val = (char*)repbody;
1302                 } else {
1303                         val = req_capsule_client_get(&req->rq_pill,
1304                                                      &RMF_SETINFO_VAL);
1305                 }
1306         }
1307
1308         if (KEY_IS(KEY_EVICT_BY_NID)) {
1309                 if (val && vallen)
1310                         obd_export_evict_by_nid(exp->exp_obd, val);
1311                 GOTO(out, rc = 0);
1312         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1313                 if (vallen < sizeof(__u32))
1314                         RETURN(-EFAULT);
1315                 __swab32s((__u32 *)val);
1316         }
1317
1318         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1319          * a struct ost_body * value */
1320         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1321 out:
1322         lustre_msg_set_status(req->rq_repmsg, 0);
1323         RETURN(rc);
1324 }
1325
1326 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1327 {
1328         void *key, *reply;
1329         int keylen, replylen, rc = 0;
1330         struct req_capsule *pill = &req->rq_pill;
1331         ENTRY;
1332
1333         /* this common part for get_info rpc */
1334         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1335         if (key == NULL) {
1336                 DEBUG_REQ(D_HA, req, "no get_info key");
1337                 RETURN(-EFAULT);
1338         }
1339         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1340
1341         if (KEY_IS(KEY_FIEMAP)) {
1342                 struct ll_fiemap_info_key *fm_key = key;
1343                 int rc;
1344
1345                 rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
1346                 if (rc)
1347                         RETURN(rc);
1348         }
1349
1350         rc = obd_get_info(exp, keylen, key, &replylen, NULL, NULL);
1351         if (rc)
1352                 RETURN(rc);
1353
1354         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1355                              RCL_SERVER, replylen);
1356
1357         rc = req_capsule_server_pack(pill);
1358         if (rc)
1359                 RETURN(rc);
1360
1361         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1362         if (reply == NULL)
1363                 RETURN(-ENOMEM);
1364
1365         /* call again to fill in the reply buffer */
1366         rc = obd_get_info(exp, keylen, key, &replylen, reply, NULL);
1367
1368         lustre_msg_set_status(req->rq_repmsg, 0);
1369         RETURN(rc);
1370 }
1371
1372 #ifdef HAVE_QUOTA_SUPPORT
1373 static int ost_handle_quotactl(struct ptlrpc_request *req)
1374 {
1375         struct obd_quotactl *oqctl, *repoqc;
1376         int rc;
1377         ENTRY;
1378
1379         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1380         if (oqctl == NULL)
1381                 GOTO(out, rc = -EPROTO);
1382
1383         rc = req_capsule_server_pack(&req->rq_pill);
1384         if (rc)
1385                 GOTO(out, rc);
1386
1387         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1388         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1389         *repoqc = *oqctl;
1390
1391 out:
1392         RETURN(rc);
1393 }
1394
1395 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1396 {
1397         struct obd_quotactl *oqctl;
1398         int rc;
1399         ENTRY;
1400
1401         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1402         if (oqctl == NULL)
1403                 RETURN(-EPROTO);
1404
1405         rc = req_capsule_server_pack(&req->rq_pill);
1406         if (rc)
1407                 RETURN(-ENOMEM);
1408
1409         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1410         RETURN(0);
1411 }
1412
1413 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1414 {
1415         struct quota_adjust_qunit *oqaq, *repoqa;
1416         struct lustre_quota_ctxt *qctxt;
1417         int rc;
1418         ENTRY;
1419
1420         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1421         oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1422         if (oqaq == NULL)
1423                 GOTO(out, rc = -EPROTO);
1424
1425         rc = req_capsule_server_pack(&req->rq_pill);
1426         if (rc)
1427                 GOTO(out, rc);
1428
1429         repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1430         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt, NULL);
1431         *repoqa = *oqaq;
1432
1433  out:
1434         RETURN(rc);
1435 }
1436 #endif
1437
1438 static int ost_llog_handle_connect(struct obd_export *exp,
1439                                    struct ptlrpc_request *req)
1440 {
1441         struct llogd_conn_body *body;
1442         int rc;
1443         ENTRY;
1444
1445         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1446         rc = obd_llog_connect(exp, body);
1447         RETURN(rc);
1448 }
1449
1450 #define ost_init_sec_none(reply, exp)                                   \
1451 do {                                                                    \
1452         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1453                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1454                                       OBD_CONNECT_OSS_CAPA);            \
1455         cfs_spin_lock(&exp->exp_lock);                                  \
1456         exp->exp_connect_flags = reply->ocd_connect_flags;              \
1457         cfs_spin_unlock(&exp->exp_lock);                                \
1458 } while (0)
1459
1460 static int ost_init_sec_level(struct ptlrpc_request *req)
1461 {
1462         struct obd_export *exp = req->rq_export;
1463         struct req_capsule *pill = &req->rq_pill;
1464         struct obd_device *obd = exp->exp_obd;
1465         struct filter_obd *filter = &obd->u.filter;
1466         char *client = libcfs_nid2str(req->rq_peer.nid);
1467         struct obd_connect_data *data, *reply;
1468         int rc = 0, remote;
1469         ENTRY;
1470
1471         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1472         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1473         if (data == NULL || reply == NULL)
1474                 RETURN(-EFAULT);
1475
1476         /* connection from MDT is always trusted */
1477         if (req->rq_auth_usr_mdt) {
1478                 ost_init_sec_none(reply, exp);
1479                 RETURN(0);
1480         }
1481
1482         /* no GSS support case */
1483         if (!req->rq_auth_gss) {
1484                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1485                         CWARN("client %s -> target %s does not user GSS, "
1486                               "can not run under security level %d.\n",
1487                               client, obd->obd_name, filter->fo_sec_level);
1488                         RETURN(-EACCES);
1489                 } else {
1490                         ost_init_sec_none(reply, exp);
1491                         RETURN(0);
1492                 }
1493         }
1494
1495         /* old version case */
1496         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1497                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1498                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1499                         CWARN("client %s -> target %s uses old version, "
1500                               "can not run under security level %d.\n",
1501                               client, obd->obd_name, filter->fo_sec_level);
1502                         RETURN(-EACCES);
1503                 } else {
1504                         CWARN("client %s -> target %s uses old version, "
1505                               "run under security level %d.\n",
1506                               client, obd->obd_name, filter->fo_sec_level);
1507                         ost_init_sec_none(reply, exp);
1508                         RETURN(0);
1509                 }
1510         }
1511
1512         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1513         if (remote) {
1514                 if (!req->rq_auth_remote)
1515                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1516                                "asked to be remote.\n", client, obd->obd_name);
1517         } else if (req->rq_auth_remote) {
1518                 remote = 1;
1519                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1520                        "as remote by default.\n", client, obd->obd_name);
1521         }
1522
1523         if (remote) {
1524                 if (!filter->fo_fl_oss_capa) {
1525                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1526                                " but OSS capabilities are not enabled: %d.\n",
1527                                client, obd->obd_name, filter->fo_fl_oss_capa);
1528                         RETURN(-EACCES);
1529                 }
1530         }
1531
1532         switch (filter->fo_sec_level) {
1533         case LUSTRE_SEC_NONE:
1534                 if (!remote) {
1535                         ost_init_sec_none(reply, exp);
1536                         break;
1537                 } else {
1538                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1539                                "can not run under security level %d.\n",
1540                                client, obd->obd_name, filter->fo_sec_level);
1541                         RETURN(-EACCES);
1542                 }
1543         case LUSTRE_SEC_REMOTE:
1544                 if (!remote)
1545                         ost_init_sec_none(reply, exp);
1546                 break;
1547         case LUSTRE_SEC_ALL:
1548                 if (!remote) {
1549                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1550                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1551                         if (!filter->fo_fl_oss_capa)
1552                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1553
1554                         cfs_spin_lock(&exp->exp_lock);
1555                         exp->exp_connect_flags = reply->ocd_connect_flags;
1556                         cfs_spin_unlock(&exp->exp_lock);
1557                 }
1558                 break;
1559         default:
1560                 RETURN(-EINVAL);
1561         }
1562
1563         RETURN(rc);
1564 }
1565
1566 /*
1567  * FIXME
1568  * this should be done in filter_connect()/filter_reconnect(), but
1569  * we can't obtain information like NID, which stored in incoming
1570  * request, thus can't decide what flavor to use. so we do it here.
1571  *
1572  * This hack should be removed after the OST stack be rewritten, just
1573  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1574  */
1575 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1576 {
1577         struct obd_export     *exp = req->rq_export;
1578         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1579         struct sptlrpc_flavor  flvr;
1580         int                    rc = 0;
1581
1582         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1583                             LUSTRE_ECHO_NAME) == 0)) {
1584                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1585                 return 0;
1586         }
1587
1588         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1589                 cfs_read_lock(&filter->fo_sptlrpc_lock);
1590                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1591                                              req->rq_sp_from,
1592                                              req->rq_peer.nid,
1593                                              &flvr);
1594                 cfs_read_unlock(&filter->fo_sptlrpc_lock);
1595
1596                 cfs_spin_lock(&exp->exp_lock);
1597
1598                 exp->exp_sp_peer = req->rq_sp_from;
1599                 exp->exp_flvr = flvr;
1600
1601                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1602                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1603                         CERROR("unauthorized rpc flavor %x from %s, "
1604                                "expect %x\n", req->rq_flvr.sf_rpc,
1605                                libcfs_nid2str(req->rq_peer.nid),
1606                                exp->exp_flvr.sf_rpc);
1607                         rc = -EACCES;
1608                 }
1609
1610                 cfs_spin_unlock(&exp->exp_lock);
1611         } else {
1612                 if (exp->exp_sp_peer != req->rq_sp_from) {
1613                         CERROR("RPC source %s doesn't match %s\n",
1614                                sptlrpc_part2name(req->rq_sp_from),
1615                                sptlrpc_part2name(exp->exp_sp_peer));
1616                         rc = -EACCES;
1617                 } else {
1618                         rc = sptlrpc_target_export_check(exp, req);
1619                 }
1620         }
1621
1622         return rc;
1623 }
1624
1625 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1626  * (if requested) */
1627 int ost_blocking_ast(struct ldlm_lock *lock,
1628                              struct ldlm_lock_desc *desc,
1629                              void *data, int flag)
1630 {
1631         __u32 sync_lock_cancel = 0;
1632         __u32 len = sizeof(sync_lock_cancel);
1633         int rc = 0;
1634         ENTRY;
1635
1636         rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1637                           KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1638
1639         if (!rc && flag == LDLM_CB_CANCELING &&
1640             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1641             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1642              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1643               lock->l_flags & LDLM_FL_CBPENDING))) {
1644                 struct obd_info *oinfo;
1645                 struct obdo *oa;
1646                 int rc;
1647
1648                 OBD_ALLOC_PTR(oinfo);
1649                 if (!oinfo)
1650                         RETURN(-ENOMEM);
1651                 OBDO_ALLOC(oa);
1652                 if (!oa) {
1653                         OBD_FREE_PTR(oinfo);
1654                         RETURN(-ENOMEM);
1655                 }
1656                 oa->o_id = lock->l_resource->lr_name.name[0];
1657                 oa->o_seq = lock->l_resource->lr_name.name[1];
1658                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
1659                 oinfo->oi_oa = oa;
1660
1661                 rc = obd_sync(lock->l_export, oinfo,
1662                               lock->l_policy_data.l_extent.start,
1663                               lock->l_policy_data.l_extent.end, NULL);
1664                 if (rc)
1665                         CERROR("Error %d syncing data on lock cancel\n", rc);
1666
1667                 OBDO_FREE(oa);
1668                 OBD_FREE_PTR(oinfo);
1669         }
1670
1671         rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1672         RETURN(rc);
1673 }
1674
1675 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1676                                        struct obd_device *obd, int *process)
1677 {
1678         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1679         case OST_CONNECT: /* This will never get here, but for completeness. */
1680         case OST_DISCONNECT:
1681                *process = 1;
1682                RETURN(0);
1683
1684         case OBD_PING:
1685         case OST_CREATE:
1686         case OST_DESTROY:
1687         case OST_PUNCH:
1688         case OST_SETATTR:
1689         case OST_SYNC:
1690         case OST_WRITE:
1691         case OBD_LOG_CANCEL:
1692         case LDLM_ENQUEUE:
1693                 *process = target_queue_recovery_request(req, obd);
1694                 RETURN(0);
1695
1696         default:
1697                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1698                 *process = -EAGAIN;
1699                 RETURN(0);
1700         }
1701 }
1702
1703 int ost_msg_check_version(struct lustre_msg *msg)
1704 {
1705         int rc;
1706
1707         switch(lustre_msg_get_opc(msg)) {
1708         case OST_CONNECT:
1709         case OST_DISCONNECT:
1710         case OBD_PING:
1711         case SEC_CTX_INIT:
1712         case SEC_CTX_INIT_CONT:
1713         case SEC_CTX_FINI:
1714                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1715                 if (rc)
1716                         CERROR("bad opc %u version %08x, expecting %08x\n",
1717                                lustre_msg_get_opc(msg),
1718                                lustre_msg_get_version(msg),
1719                                LUSTRE_OBD_VERSION);
1720                 break;
1721         case OST_CREATE:
1722         case OST_DESTROY:
1723         case OST_GETATTR:
1724         case OST_SETATTR:
1725         case OST_WRITE:
1726         case OST_READ:
1727         case OST_PUNCH:
1728         case OST_STATFS:
1729         case OST_SYNC:
1730         case OST_SET_INFO:
1731         case OST_GET_INFO:
1732 #ifdef HAVE_QUOTA_SUPPORT
1733         case OST_QUOTACHECK:
1734         case OST_QUOTACTL:
1735         case OST_QUOTA_ADJUST_QUNIT:
1736 #endif
1737                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1738                 if (rc)
1739                         CERROR("bad opc %u version %08x, expecting %08x\n",
1740                                lustre_msg_get_opc(msg),
1741                                lustre_msg_get_version(msg),
1742                                LUSTRE_OST_VERSION);
1743                 break;
1744         case LDLM_ENQUEUE:
1745         case LDLM_CONVERT:
1746         case LDLM_CANCEL:
1747         case LDLM_BL_CALLBACK:
1748         case LDLM_CP_CALLBACK:
1749                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1750                 if (rc)
1751                         CERROR("bad opc %u version %08x, expecting %08x\n",
1752                                lustre_msg_get_opc(msg),
1753                                lustre_msg_get_version(msg),
1754                                LUSTRE_DLM_VERSION);
1755                 break;
1756         case LLOG_ORIGIN_CONNECT:
1757         case OBD_LOG_CANCEL:
1758                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1759                 if (rc)
1760                         CERROR("bad opc %u version %08x, expecting %08x\n",
1761                                lustre_msg_get_opc(msg),
1762                                lustre_msg_get_version(msg),
1763                                LUSTRE_LOG_VERSION);
1764                 break;
1765         default:
1766                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1767                 rc = -ENOTSUPP;
1768         }
1769         return rc;
1770 }
1771
1772 /**
1773  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1774  * not.
1775  */
1776 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1777                                    struct ldlm_lock *lock)
1778 {
1779         struct niobuf_remote *nb;
1780         struct obd_ioobj *ioo;
1781         struct ost_body *body;
1782         int objcount, niocount;
1783         int mode, opc, i, rc;
1784         __u64 start, end;
1785         ENTRY;
1786
1787         opc = lustre_msg_get_opc(req->rq_reqmsg);
1788         LASSERT(opc == OST_READ || opc == OST_WRITE);
1789
1790         /* As the request may be covered by several locks, do not look at
1791          * o_handle, look at the RPC IO region. */
1792         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1793         if (body == NULL)
1794                 RETURN(0);
1795
1796         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1797                                         RCL_CLIENT) / sizeof(*ioo);
1798         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1799         if (ioo == NULL)
1800                 RETURN(0);
1801
1802         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
1803         if (rc)
1804                 RETURN(rc);
1805
1806         for (niocount = i = 0; i < objcount; i++)
1807                 niocount += ioo[i].ioo_bufcnt;
1808
1809         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1810         if (nb == NULL ||
1811             niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
1812             RCL_CLIENT) / sizeof(*nb)))
1813                 RETURN(0);
1814
1815         mode = LCK_PW;
1816         if (opc == OST_READ)
1817                 mode |= LCK_PR;
1818
1819         start = nb[0].offset & CFS_PAGE_MASK;
1820         end = (nb[ioo->ioo_bufcnt - 1].offset +
1821                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
1822
1823         LASSERT(lock->l_resource != NULL);
1824         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
1825                              &lock->l_resource->lr_name))
1826                 RETURN(0);
1827
1828         if (!(lock->l_granted_mode & mode))
1829                 RETURN(0);
1830
1831         if (lock->l_policy_data.l_extent.end < start ||
1832             lock->l_policy_data.l_extent.start > end)
1833                 RETURN(0);
1834
1835         RETURN(1);
1836 }
1837
1838 /**
1839  * High-priority queue request check for whether the given PTLRPC request (\a
1840  * req) is blocking an LDLM lock cancel.
1841  *
1842  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1843  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1844  *
1845  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1846  * function looks only at OST_READs and OST_WRITEs.
1847  */
1848 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1849 {
1850         struct niobuf_remote *nb;
1851         struct obd_ioobj *ioo;
1852         struct ost_body *body;
1853         int objcount, niocount;
1854         int mode, opc, i, rc;
1855         ENTRY;
1856
1857         opc = lustre_msg_get_opc(req->rq_reqmsg);
1858         LASSERT(opc == OST_READ || opc == OST_WRITE);
1859
1860         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1861         if (body == NULL)
1862                 RETURN(-EFAULT);
1863
1864         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1865                                         RCL_CLIENT) / sizeof(*ioo);
1866         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1867         if (ioo == NULL)
1868                 RETURN(-EFAULT);
1869
1870         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
1871         if (rc)
1872                 RETURN(rc);
1873
1874         for (niocount = i = 0; i < objcount; i++)
1875                 niocount += ioo[i].ioo_bufcnt;
1876         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1877         if (nb == NULL ||
1878             niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
1879             RCL_CLIENT) / sizeof(*nb)))
1880                 RETURN(-EFAULT);
1881         if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
1882                 RETURN(-EFAULT);
1883
1884         mode = LCK_PW;
1885         if (opc == OST_READ)
1886                 mode |= LCK_PR;
1887         RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
1888 }
1889
1890 static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
1891 {
1892         struct ldlm_res_id res_id = { .name = { oa->o_id } };
1893         struct ost_prolong_data opd = { 0 };
1894         __u64 start, end;
1895         ENTRY;
1896
1897         start = oa->o_size;
1898         end = start + oa->o_blocks;
1899
1900         opd.opd_mode = LCK_PW;
1901         opd.opd_exp = req->rq_export;
1902         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
1903         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
1904                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
1905         else
1906                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
1907
1908         /* prolong locks for the current service time of the corresponding
1909          * portal (= OST_IO_PORTAL) */
1910         opd.opd_timeout = AT_OFF ? obd_timeout / 2:
1911                           max(at_est2timeout(at_get(&req->rq_rqbd->
1912                               rqbd_service->srv_at_estimate)), ldlm_timeout);
1913
1914         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1915                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
1916                opd.opd_policy.l_extent.end);
1917
1918         opd.opd_oa = oa;
1919         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
1920                               ost_prolong_locks_iter, &opd);
1921         RETURN(opd.opd_lock_match);
1922 }
1923
1924 /**
1925  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1926  */
1927 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1928                                       struct ldlm_lock *lock)
1929 {
1930         struct ost_body *body;
1931         int rc;
1932         ENTRY;
1933
1934         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1935         if (body == NULL)
1936                 RETURN(0);  /* can't return -EFAULT here */
1937
1938         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
1939         if (rc)
1940                 RETURN(rc);
1941
1942         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1943             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1944                 RETURN(1);
1945         RETURN(0);
1946 }
1947
1948 /**
1949  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1950  */
1951 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1952 {
1953         struct ost_body *body;
1954         int rc;
1955
1956         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1957         if (body == NULL)
1958                 RETURN(-EFAULT);
1959
1960         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
1961         if (rc)
1962                 RETURN(rc);
1963
1964         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1965                 !(body->oa.o_flags & OBD_FL_SRVLOCK));
1966
1967         RETURN(ost_punch_prolong_locks(req, &body->oa));
1968 }
1969
1970 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1971         .hpreq_lock_match  = ost_rw_hpreq_lock_match,
1972         .hpreq_check       = ost_rw_hpreq_check,
1973 };
1974
1975 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1976         .hpreq_lock_match  = ost_punch_hpreq_lock_match,
1977         .hpreq_check       = ost_punch_hpreq_check,
1978 };
1979
1980 /** Assign high priority operations to the request if needed. */
1981 static int ost_hpreq_handler(struct ptlrpc_request *req)
1982 {
1983         ENTRY;
1984         if (req->rq_export) {
1985                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1986                 struct ost_body *body;
1987
1988                 if (opc == OST_READ || opc == OST_WRITE) {
1989                         struct niobuf_remote *nb;
1990                         struct obd_ioobj *ioo;
1991                         int objcount, niocount;
1992                         int i;
1993
1994                         /* RPCs on the H-P queue can be inspected before
1995                          * ost_handler() initializes their pills, so we
1996                          * initialize that here.  Capsule initialization is
1997                          * idempotent, as is setting the pill's format (provided
1998                          * it doesn't change).
1999                          */
2000                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2001                         if (opc == OST_READ)
2002                                 req_capsule_set(&req->rq_pill,
2003                                                 &RQF_OST_BRW_READ);
2004                         else
2005                                 req_capsule_set(&req->rq_pill,
2006                                                 &RQF_OST_BRW_WRITE);
2007
2008                         body = req_capsule_client_get(&req->rq_pill,
2009                                                       &RMF_OST_BODY);
2010                         if (body == NULL) {
2011                                 CERROR("Missing/short ost_body\n");
2012                                 RETURN(-EFAULT);
2013                         }
2014
2015                         objcount = req_capsule_get_size(&req->rq_pill,
2016                                                         &RMF_OBD_IOOBJ,
2017                                                         RCL_CLIENT) /
2018                                                         sizeof(*ioo);
2019                         if (objcount == 0) {
2020                                 CERROR("Missing/short ioobj\n");
2021                                 RETURN(-EFAULT);
2022                         }
2023                         if (objcount > 1) {
2024                                 CERROR("too many ioobjs (%d)\n", objcount);
2025                                 RETURN(-EFAULT);
2026                         }
2027
2028                         ioo = req_capsule_client_get(&req->rq_pill,
2029                                                      &RMF_OBD_IOOBJ);
2030                         if (ioo == NULL) {
2031                                 CERROR("Missing/short ioobj\n");
2032                                 RETURN(-EFAULT);
2033                         }
2034
2035                         for (niocount = i = 0; i < objcount; i++) {
2036                                 if (ioo[i].ioo_bufcnt == 0) {
2037                                         CERROR("ioo[%d] has zero bufcnt\n", i);
2038                                         RETURN(-EFAULT);
2039                                 }
2040                                 niocount += ioo[i].ioo_bufcnt;
2041                         }
2042                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
2043                                 DEBUG_REQ(D_RPCTRACE, req,
2044                                           "bulk has too many pages (%d)",
2045                                           niocount);
2046                                 RETURN(-EFAULT);
2047                         }
2048
2049                         nb = req_capsule_client_get(&req->rq_pill,
2050                                                     &RMF_NIOBUF_REMOTE);
2051                         if (nb == NULL) {
2052                                 CERROR("Missing/short niobuf\n");
2053                                 RETURN(-EFAULT);
2054                         }
2055
2056                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
2057                                 req->rq_ops = &ost_hpreq_rw;
2058                 } else if (opc == OST_PUNCH) {
2059                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2060                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2061
2062                         body = req_capsule_client_get(&req->rq_pill,
2063                                                       &RMF_OST_BODY);
2064                         if (body == NULL) {
2065                                 CERROR("Missing/short ost_body\n");
2066                                 RETURN(-EFAULT);
2067                         }
2068
2069                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
2070                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
2071                                 req->rq_ops = &ost_hpreq_punch;
2072                 }
2073         }
2074         RETURN(0);
2075 }
2076
2077 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2078 int ost_handle(struct ptlrpc_request *req)
2079 {
2080         struct obd_trans_info trans_info = { 0, };
2081         struct obd_trans_info *oti = &trans_info;
2082         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
2083         struct obd_device *obd = NULL;
2084         ENTRY;
2085
2086         LASSERT(current->journal_info == NULL);
2087
2088         /* primordial rpcs don't affect server recovery */
2089         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2090         case SEC_CTX_INIT:
2091         case SEC_CTX_INIT_CONT:
2092         case SEC_CTX_FINI:
2093                 GOTO(out, rc = 0);
2094         }
2095
2096         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2097
2098         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
2099                 if (!class_connected_export(req->rq_export)) {
2100                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2101                                lustre_msg_get_opc(req->rq_reqmsg),
2102                                libcfs_id2str(req->rq_peer));
2103                         req->rq_status = -ENOTCONN;
2104                         GOTO(out, rc = -ENOTCONN);
2105                 }
2106
2107                 obd = req->rq_export->exp_obd;
2108
2109                 /* Check for aborted recovery. */
2110                 if (obd->obd_recovering) {
2111                         rc = ost_filter_recovery_request(req, obd,
2112                                                          &should_process);
2113                         if (rc || !should_process)
2114                                 RETURN(rc);
2115                         else if (should_process < 0) {
2116                                 req->rq_status = should_process;
2117                                 rc = ptlrpc_error(req);
2118                                 RETURN(rc);
2119                         }
2120                 }
2121         }
2122
2123         oti_init(oti, req);
2124
2125         rc = ost_msg_check_version(req->rq_reqmsg);
2126         if (rc)
2127                 RETURN(rc);
2128
2129         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2130         case OST_CONNECT: {
2131                 CDEBUG(D_INODE, "connect\n");
2132                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2133                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2134                         RETURN(0);
2135                 rc = target_handle_connect(req);
2136                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2137                         RETURN(0);
2138                 if (!rc) {
2139                         rc = ost_init_sec_level(req);
2140                         if (!rc)
2141                                 rc = ost_connect_check_sptlrpc(req);
2142                 }
2143                 break;
2144         }
2145         case OST_DISCONNECT:
2146                 CDEBUG(D_INODE, "disconnect\n");
2147                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2148                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2149                         RETURN(0);
2150                 rc = target_handle_disconnect(req);
2151                 break;
2152         case OST_CREATE:
2153                 CDEBUG(D_INODE, "create\n");
2154                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2155                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2156                         RETURN(0);
2157                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2158                         GOTO(out, rc = -EROFS);
2159                 rc = ost_create(req->rq_export, req, oti);
2160                 break;
2161         case OST_DESTROY:
2162                 CDEBUG(D_INODE, "destroy\n");
2163                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2164                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2165                         RETURN(0);
2166                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2167                         GOTO(out, rc = -EROFS);
2168                 rc = ost_destroy(req->rq_export, req, oti);
2169                 break;
2170         case OST_GETATTR:
2171                 CDEBUG(D_INODE, "getattr\n");
2172                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2173                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2174                         RETURN(0);
2175                 rc = ost_getattr(req->rq_export, req);
2176                 break;
2177         case OST_SETATTR:
2178                 CDEBUG(D_INODE, "setattr\n");
2179                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2180                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2181                         RETURN(0);
2182                 rc = ost_setattr(req->rq_export, req, oti);
2183                 break;
2184         case OST_WRITE:
2185                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
2186                 CDEBUG(D_INODE, "write\n");
2187                 /* req->rq_request_portal would be nice, if it was set */
2188                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2189                         CERROR("%s: deny write request from %s to portal %u\n",
2190                                req->rq_export->exp_obd->obd_name,
2191                                obd_export_nid2str(req->rq_export),
2192                                req->rq_rqbd->rqbd_service->srv_req_portal);
2193                         GOTO(out, rc = -EPROTO);
2194                 }
2195                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2196                         RETURN(0);
2197                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2198                         GOTO(out, rc = -ENOSPC);
2199                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2200                         GOTO(out, rc = -EROFS);
2201                 rc = ost_brw_write(req, oti);
2202                 LASSERT(current->journal_info == NULL);
2203                 /* ost_brw_write sends its own replies */
2204                 RETURN(rc);
2205         case OST_READ:
2206                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
2207                 CDEBUG(D_INODE, "read\n");
2208                 /* req->rq_request_portal would be nice, if it was set */
2209                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2210                         CERROR("%s: deny read request from %s to portal %u\n",
2211                                req->rq_export->exp_obd->obd_name,
2212                                obd_export_nid2str(req->rq_export),
2213                                req->rq_rqbd->rqbd_service->srv_req_portal);
2214                         GOTO(out, rc = -EPROTO);
2215                 }
2216                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2217                         RETURN(0);
2218                 rc = ost_brw_read(req, oti);
2219                 LASSERT(current->journal_info == NULL);
2220                 /* ost_brw_read sends its own replies */
2221                 RETURN(rc);
2222         case OST_PUNCH:
2223                 CDEBUG(D_INODE, "punch\n");
2224                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2225                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2226                         RETURN(0);
2227                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2228                         GOTO(out, rc = -EROFS);
2229                 rc = ost_punch(req->rq_export, req, oti);
2230                 break;
2231         case OST_STATFS:
2232                 CDEBUG(D_INODE, "statfs\n");
2233                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2234                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2235                         RETURN(0);
2236                 rc = ost_statfs(req);
2237                 break;
2238         case OST_SYNC:
2239                 CDEBUG(D_INODE, "sync\n");
2240                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2241                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2242                         RETURN(0);
2243                 rc = ost_sync(req->rq_export, req);
2244                 break;
2245         case OST_SET_INFO:
2246                 DEBUG_REQ(D_INODE, req, "set_info");
2247                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2248                 rc = ost_set_info(req->rq_export, req);
2249                 break;
2250         case OST_GET_INFO:
2251                 DEBUG_REQ(D_INODE, req, "get_info");
2252                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2253                 rc = ost_get_info(req->rq_export, req);
2254                 break;
2255 #ifdef HAVE_QUOTA_SUPPORT
2256         case OST_QUOTACHECK:
2257                 CDEBUG(D_INODE, "quotacheck\n");
2258                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2259                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2260                         RETURN(0);
2261                 rc = ost_handle_quotacheck(req);
2262                 break;
2263         case OST_QUOTACTL:
2264                 CDEBUG(D_INODE, "quotactl\n");
2265                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2266                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2267                         RETURN(0);
2268                 rc = ost_handle_quotactl(req);
2269                 break;
2270         case OST_QUOTA_ADJUST_QUNIT:
2271                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
2272                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
2273                 rc = ost_handle_quota_adjust_qunit(req);
2274                 break;
2275 #endif
2276         case OBD_PING:
2277                 DEBUG_REQ(D_INODE, req, "ping");
2278                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2279                 rc = target_handle_ping(req);
2280                 break;
2281         /* FIXME - just reply status */
2282         case LLOG_ORIGIN_CONNECT:
2283                 DEBUG_REQ(D_INODE, req, "log connect");
2284                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2285                 rc = ost_llog_handle_connect(req->rq_export, req);
2286                 req->rq_status = rc;
2287                 rc = req_capsule_server_pack(&req->rq_pill);
2288                 if (rc)
2289                         RETURN(rc);
2290                 RETURN(ptlrpc_reply(req));
2291         case OBD_LOG_CANCEL:
2292                 CDEBUG(D_INODE, "log cancel\n");
2293                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2294                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2295                         RETURN(0);
2296                 rc = llog_origin_handle_cancel(req);
2297                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2298                         RETURN(0);
2299                 req->rq_status = rc;
2300                 rc = req_capsule_server_pack(&req->rq_pill);
2301                 if (rc)
2302                         RETURN(rc);
2303                 RETURN(ptlrpc_reply(req));
2304         case LDLM_ENQUEUE:
2305                 CDEBUG(D_INODE, "enqueue\n");
2306                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2307                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
2308                         RETURN(0);
2309                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2310                                          ost_blocking_ast,
2311                                          ldlm_server_glimpse_ast);
2312                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2313                 break;
2314         case LDLM_CONVERT:
2315                 CDEBUG(D_INODE, "convert\n");
2316                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2317                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
2318                         RETURN(0);
2319                 rc = ldlm_handle_convert(req);
2320                 break;
2321         case LDLM_CANCEL:
2322                 CDEBUG(D_INODE, "cancel\n");
2323                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2324                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2325                         RETURN(0);
2326                 rc = ldlm_handle_cancel(req);
2327                 break;
2328         case LDLM_BL_CALLBACK:
2329         case LDLM_CP_CALLBACK:
2330                 CDEBUG(D_INODE, "callback\n");
2331                 CERROR("callbacks should not happen on OST\n");
2332                 /* fall through */
2333         default:
2334                 CERROR("Unexpected opcode %d\n",
2335                        lustre_msg_get_opc(req->rq_reqmsg));
2336                 req->rq_status = -ENOTSUPP;
2337                 rc = ptlrpc_error(req);
2338                 RETURN(rc);
2339         }
2340
2341         LASSERT(current->journal_info == NULL);
2342
2343         EXIT;
2344         /* If we're DISCONNECTing, the export_data is already freed */
2345         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2346                 target_committed_to_req(req);
2347
2348 out:
2349         if (!rc)
2350                 oti_to_request(oti, req);
2351
2352         target_send_reply(req, rc, fail);
2353         return 0;
2354 }
2355 EXPORT_SYMBOL(ost_handle);
2356 /*
2357  * free per-thread pool created by ost_thread_init().
2358  */
2359 static void ost_thread_done(struct ptlrpc_thread *thread)
2360 {
2361         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2362                                              * Storage */
2363
2364         ENTRY;
2365
2366         LASSERT(thread != NULL);
2367
2368         /*
2369          * be prepared to handle partially-initialized pools (because this is
2370          * called from ost_thread_init() for cleanup.
2371          */
2372         tls = thread->t_data;
2373         if (tls != NULL) {
2374                 OBD_FREE_PTR(tls);
2375                 thread->t_data = NULL;
2376         }
2377         EXIT;
2378 }
2379
2380 /*
2381  * initialize per-thread page pool (bug 5137).
2382  */
2383 static int ost_thread_init(struct ptlrpc_thread *thread)
2384 {
2385         struct ost_thread_local_cache *tls;
2386
2387         ENTRY;
2388
2389         LASSERT(thread != NULL);
2390         LASSERT(thread->t_data == NULL);
2391         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2392
2393         OBD_ALLOC_PTR(tls);
2394         if (tls == NULL)
2395                 RETURN(-ENOMEM);
2396         thread->t_data = tls;
2397         RETURN(0);
2398 }
2399
2400 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2401
2402 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2403 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2404 {
2405         struct ost_obd *ost = &obd->u.ost;
2406         struct lprocfs_static_vars lvars;
2407         int oss_min_threads;
2408         int oss_max_threads;
2409         int oss_min_create_threads;
2410         int oss_max_create_threads;
2411         int rc;
2412         ENTRY;
2413
2414         rc = cfs_cleanup_group_info();
2415         if (rc)
2416                 RETURN(rc);
2417
2418         lprocfs_ost_init_vars(&lvars);
2419         lprocfs_obd_setup(obd, lvars.obd_vars);
2420
2421         cfs_sema_init(&ost->ost_health_sem, 1);
2422
2423         if (oss_num_threads) {
2424                 /* If oss_num_threads is set, it is the min and the max. */
2425                 if (oss_num_threads > OSS_THREADS_MAX)
2426                         oss_num_threads = OSS_THREADS_MAX;
2427                 if (oss_num_threads < OSS_THREADS_MIN)
2428                         oss_num_threads = OSS_THREADS_MIN;
2429                 oss_max_threads = oss_min_threads = oss_num_threads;
2430         } else {
2431                 /* Base min threads on memory and cpus */
2432                 oss_min_threads =
2433                         cfs_num_possible_cpus() * CFS_NUM_CACHEPAGES >>
2434                         (27 - CFS_PAGE_SHIFT);
2435                 if (oss_min_threads < OSS_THREADS_MIN)
2436                         oss_min_threads = OSS_THREADS_MIN;
2437                 /* Insure a 4x range for dynamic threads */
2438                 if (oss_min_threads > OSS_THREADS_MAX / 4)
2439                         oss_min_threads = OSS_THREADS_MAX / 4;
2440                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2441         }
2442
2443         ost->ost_service =
2444                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2445                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
2446                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2447                                 ost_handle, LUSTRE_OSS_NAME,
2448                                 obd->obd_proc_entry, target_print_req,
2449                                 oss_min_threads, oss_max_threads,
2450                                 "ll_ost", LCT_DT_THREAD, NULL);
2451         if (ost->ost_service == NULL) {
2452                 CERROR("failed to start service\n");
2453                 GOTO(out_lprocfs, rc = -ENOMEM);
2454         }
2455
2456         rc = ptlrpc_start_threads(ost->ost_service);
2457         if (rc)
2458                 GOTO(out_service, rc = -EINVAL);
2459
2460         if (oss_num_create_threads) {
2461                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2462                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2463                 if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
2464                         oss_num_create_threads = OSS_MIN_CREATE_THREADS;
2465                 oss_min_create_threads = oss_max_create_threads =
2466                         oss_num_create_threads;
2467         } else {
2468                 oss_min_create_threads = OSS_MIN_CREATE_THREADS;
2469                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2470         }
2471
2472         ost->ost_create_service =
2473                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2474                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2475                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2476                                 ost_handle, "ost_create",
2477                                 obd->obd_proc_entry, target_print_req,
2478                                 oss_min_create_threads, oss_max_create_threads,
2479                                 "ll_ost_creat", LCT_DT_THREAD, NULL);
2480         if (ost->ost_create_service == NULL) {
2481                 CERROR("failed to start OST create service\n");
2482                 GOTO(out_service, rc = -ENOMEM);
2483         }
2484
2485         rc = ptlrpc_start_threads(ost->ost_create_service);
2486         if (rc)
2487                 GOTO(out_create, rc = -EINVAL);
2488
2489         ost->ost_io_service =
2490                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2491                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2492                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2493                                 ost_handle, "ost_io",
2494                                 obd->obd_proc_entry, target_print_req,
2495                                 oss_min_threads, oss_max_threads,
2496                                 "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
2497         if (ost->ost_io_service == NULL) {
2498                 CERROR("failed to start OST I/O service\n");
2499                 GOTO(out_create, rc = -ENOMEM);
2500         }
2501
2502         ost->ost_io_service->srv_init = ost_thread_init;
2503         ost->ost_io_service->srv_done = ost_thread_done;
2504         ost->ost_io_service->srv_cpu_affinity = 1;
2505         rc = ptlrpc_start_threads(ost->ost_io_service);
2506         if (rc)
2507                 GOTO(out_io, rc = -EINVAL);
2508
2509         ping_evictor_start();
2510
2511         RETURN(0);
2512
2513 out_io:
2514         ptlrpc_unregister_service(ost->ost_io_service);
2515         ost->ost_io_service = NULL;
2516 out_create:
2517         ptlrpc_unregister_service(ost->ost_create_service);
2518         ost->ost_create_service = NULL;
2519 out_service:
2520         ptlrpc_unregister_service(ost->ost_service);
2521         ost->ost_service = NULL;
2522 out_lprocfs:
2523         lprocfs_obd_cleanup(obd);
2524         RETURN(rc);
2525 }
2526
2527 static int ost_cleanup(struct obd_device *obd)
2528 {
2529         struct ost_obd *ost = &obd->u.ost;
2530         int err = 0;
2531         ENTRY;
2532
2533         ping_evictor_stop();
2534
2535         /* there is no recovery for OST OBD, all recovery is controlled by
2536          * obdfilter OBD */
2537         LASSERT(obd->obd_recovering == 0);
2538         cfs_down(&ost->ost_health_sem);
2539         ptlrpc_unregister_service(ost->ost_service);
2540         ptlrpc_unregister_service(ost->ost_create_service);
2541         ptlrpc_unregister_service(ost->ost_io_service);
2542         ost->ost_service = NULL;
2543         ost->ost_create_service = NULL;
2544         cfs_up(&ost->ost_health_sem);
2545
2546         lprocfs_obd_cleanup(obd);
2547
2548         RETURN(err);
2549 }
2550
2551 static int ost_health_check(struct obd_device *obd)
2552 {
2553         struct ost_obd *ost = &obd->u.ost;
2554         int rc = 0;
2555
2556         cfs_down(&ost->ost_health_sem);
2557         rc |= ptlrpc_service_health_check(ost->ost_service);
2558         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2559         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2560         cfs_up(&ost->ost_health_sem);
2561
2562         /*
2563          * health_check to return 0 on healthy
2564          * and 1 on unhealthy.
2565          */
2566         if( rc != 0)
2567                 rc = 1;
2568
2569         return rc;
2570 }
2571
2572 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2573 {
2574         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2575 }
2576
2577 /* use obd ops to offer management infrastructure */
2578 static struct obd_ops ost_obd_ops = {
2579         .o_owner        = THIS_MODULE,
2580         .o_setup        = ost_setup,
2581         .o_cleanup      = ost_cleanup,
2582         .o_health_check = ost_health_check,
2583 };
2584
2585
2586 static int __init ost_init(void)
2587 {
2588         struct lprocfs_static_vars lvars;
2589         int rc;
2590         ENTRY;
2591
2592         lprocfs_ost_init_vars(&lvars);
2593         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2594                                  LUSTRE_OSS_NAME, NULL);
2595
2596         if (ost_num_threads != 0 && oss_num_threads == 0) {
2597                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2598                               "use oss_num_threads instead or unset both for "
2599                               "dynamic thread startup\n");
2600                 oss_num_threads = ost_num_threads;
2601         }
2602
2603         RETURN(rc);
2604 }
2605
2606 static void /*__exit*/ ost_exit(void)
2607 {
2608         class_unregister_type(LUSTRE_OSS_NAME);
2609 }
2610
2611 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2612 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2613 MODULE_LICENSE("GPL");
2614
2615 module_init(ost_init);
2616 module_exit(ost_exit);