Whamcloud - gitweb
b979851e4e45f6d3da5310fa0484bc811abd1e56
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45 #define DEBUG_SUBSYSTEM S_OST
46
47 #include <linux/module.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <lustre_net.h>
51 #include <lustre_dlm.h>
52 #include <lustre_export.h>
53 #include <lustre_debug.h>
54 #include <linux/init.h>
55 #include <lprocfs_status.h>
56 #include <libcfs/list.h>
57 #include <lustre_quota.h>
58 #include "ost_internal.h"
59
60 static int oss_num_threads;
61 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
62                 "number of OSS service threads to start");
63
64 static int ost_num_threads;
65 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
66                 "number of OST service threads to start (deprecated)");
67
68 static int oss_num_create_threads;
69 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
70                 "number of OSS create threads to start");
71
72 /**
73  * Do not return server-side uid/gid to remote client
74  */
75 static void ost_drop_id(struct obd_export *exp, struct  obdo *oa)
76 {
77         if (exp_connect_rmtclient(exp)) {
78                 oa->o_uid = -1;
79                 oa->o_gid = -1;
80                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
81         }
82 }
83
84 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
85 {
86         struct oti_req_ack_lock *ack_lock;
87         int i;
88
89         if (oti == NULL)
90                 return;
91
92         if (req->rq_repmsg)
93                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
94         req->rq_transno = oti->oti_transno;
95
96         /* XXX 4 == entries in oti_ack_locks??? */
97         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
98                 if (!ack_lock->mode)
99                         break;
100                 /* XXX not even calling target_send_reply in some cases... */
101                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
102         }
103 }
104
105 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
106                        struct obd_trans_info *oti)
107 {
108         struct ost_body *body, *repbody;
109         struct lustre_capa *capa = NULL;
110         int rc;
111         ENTRY;
112
113         /* Get the request body */
114         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115         if (body == NULL)
116                 RETURN(-EFAULT);
117
118         if (body->oa.o_id == 0)
119                 RETURN(-EPROTO);
120
121         /* If there's a DLM request, cancel the locks mentioned in it*/
122         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
123                 struct ldlm_request *dlm;
124
125                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
126                 if (dlm == NULL)
127                         RETURN (-EFAULT);
128                 ldlm_request_cancel(req, dlm, 0);
129         }
130
131         /* If there's a capability, get it */
132         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
133                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
134                 if (capa == NULL) {
135                         CERROR("Missing capability for OST DESTROY");
136                         RETURN (-EFAULT);
137                 }
138         }
139
140         /* Prepare the reply */
141         rc = req_capsule_server_pack(&req->rq_pill);
142         if (rc)
143                 RETURN(rc);
144
145         /* Get the log cancellation cookie */
146         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
147                 oti->oti_logcookies = &body->oa.o_lcookie;
148
149         /* Finish the reply */
150         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
151         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
152
153         /* Do the destroy and set the reply status accordingly  */
154         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL, capa);
155         RETURN(0);
156 }
157
158 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
159 {
160         struct ost_body *body, *repbody;
161         struct obd_info oinfo = { { { 0 } } };
162         int rc;
163         ENTRY;
164
165         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
166         if (body == NULL)
167                 RETURN(-EFAULT);
168
169         rc = req_capsule_server_pack(&req->rq_pill);
170         if (rc)
171                 RETURN(rc);
172
173         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
174         repbody->oa = body->oa;
175
176         oinfo.oi_oa = &repbody->oa;
177         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
178                 oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
179                                                        &RMF_CAPA1);
180                 if (oinfo.oi_capa == NULL) {
181                         CERROR("Missing capability for OST GETATTR");
182                         RETURN (-EFAULT);
183                 }
184         }
185
186         req->rq_status = obd_getattr(exp, &oinfo);
187         ost_drop_id(exp, &repbody->oa);
188         RETURN(0);
189 }
190
191 static int ost_statfs(struct ptlrpc_request *req)
192 {
193         struct obd_statfs *osfs;
194         int rc;
195         ENTRY;
196
197         rc = req_capsule_server_pack(&req->rq_pill);
198         if (rc)
199                 RETURN(rc);
200
201         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
202
203         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
204                                     cfs_time_current_64() - HZ, 0);
205         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
206                 osfs->os_bfree = osfs->os_bavail = 64;
207         if (req->rq_status != 0)
208                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
209
210         RETURN(0);
211 }
212
213 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
214                       struct obd_trans_info *oti)
215 {
216         struct ost_body *body, *repbody;
217         int rc;
218         ENTRY;
219
220         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body == NULL)
222                 RETURN(-EFAULT);
223
224         rc = req_capsule_server_pack(&req->rq_pill);
225         if (rc)
226                 RETURN(rc);
227
228         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
229         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
230         oti->oti_logcookies = &repbody->oa.o_lcookie;
231
232         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
233         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
234         RETURN(0);
235 }
236
237 /**
238  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
239  * lock on the file being truncated.
240  */
241 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
242                               struct lustre_handle *lh)
243 {
244         int flags;
245         struct ldlm_res_id res_id;
246         ldlm_policy_data_t policy;
247         __u64 start;
248         __u64 finis;
249
250         ENTRY;
251
252         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
253         LASSERT(!lustre_handle_is_used(lh));
254
255         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
256             !(oa->o_flags & OBD_FL_TRUNCLOCK))
257                 RETURN(0);
258
259         CDEBUG(D_INODE, "OST-side truncate lock.\n");
260
261         start = oa->o_size;
262         finis = start + oa->o_blocks;
263
264         /*
265          * standard truncate optimization: if file body is completely
266          * destroyed, don't send data back to the server.
267          */
268         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
269
270         policy.l_extent.start = start & CFS_PAGE_MASK;
271
272         /*
273          * If ->o_blocks is EOF it means "lock till the end of the
274          * file". Otherwise, it's size of a hole being punched (in bytes)
275          */
276         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
277                 policy.l_extent.end = OBD_OBJECT_EOF;
278         else
279                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
280
281         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
282                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
283                                       ldlm_blocking_ast, ldlm_completion_ast,
284                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
285 }
286
287 /**
288  * Helper function for ost_punch(): release lock acquired by
289  * ost_punch_lock_get(), if any.
290  */
291 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
292                                struct lustre_handle *lh)
293 {
294         ENTRY;
295         if (lustre_handle_is_used(lh))
296                 ldlm_lock_decref(lh, LCK_PW);
297         EXIT;
298 }
299
300 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
301                      struct obd_trans_info *oti)
302 {
303         struct obd_info oinfo = { { { 0 } } };
304         struct ost_body *body, *repbody;
305         int rc;
306         struct lustre_handle lh = {0,};
307         ENTRY;
308
309         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
310         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
311
312         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
313         if (body == NULL)
314                 RETURN(-EFAULT);
315
316         oinfo.oi_oa = &body->oa;
317         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
318         oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
319
320         if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
321             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
322                 RETURN(-EPROTO);
323
324         rc = req_capsule_server_pack(&req->rq_pill);
325         if (rc)
326                 RETURN(rc);
327
328         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
329         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
330         if (rc == 0) {
331                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
332                     oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
333                         /*
334                          * If OBD_FL_TRUNCLOCK is the only bit set in
335                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
336                          * through filter_setattr() to filter_iocontrol().
337                          */
338                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
339
340                 if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
341                         oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
342                                                                &RMF_CAPA1);
343                         if (oinfo.oi_capa == NULL) {
344                                 CERROR("Missing capability for OST PUNCH");
345                                 RETURN (-EFAULT);
346                         }
347                 }
348                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
349                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
350         }
351         repbody->oa = *oinfo.oi_oa;
352         ost_drop_id(exp, &repbody->oa);
353         RETURN(rc);
354 }
355
356 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
357 {
358         struct ost_body *body, *repbody;
359         struct lustre_capa *capa = NULL;
360         int rc;
361         ENTRY;
362
363         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
364         if (body == NULL)
365                 RETURN(-EFAULT);
366
367         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
368                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
369                 if (capa == NULL) {
370                         CERROR("Missing capability for OST SYNC");
371                         RETURN (-EFAULT);
372                 }
373         }
374
375         rc = req_capsule_server_pack(&req->rq_pill);
376         if (rc)
377                 RETURN(rc);
378
379         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
380         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
381         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
382                                   repbody->oa.o_blocks, capa);
383         ost_drop_id(exp, &repbody->oa);
384         RETURN(0);
385 }
386
387 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
388                        struct obd_trans_info *oti)
389 {
390         struct ost_body *body, *repbody;
391         int rc;
392         struct obd_info oinfo = { { { 0 } } };
393         ENTRY;
394
395         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
396         if (body == NULL)
397                 RETURN(-EFAULT);
398
399         rc = req_capsule_server_pack(&req->rq_pill);
400         if (rc)
401                 RETURN(rc);
402
403         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
404         repbody->oa = body->oa;
405
406         oinfo.oi_oa = &repbody->oa;
407         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA) {
408                 oinfo.oi_capa = req_capsule_client_get(&req->rq_pill,
409                                                        &RMF_CAPA1);
410                 if (oinfo.oi_capa == NULL) {
411                         CERROR("Missing capability for OST SETATTR");
412                         RETURN (-EFAULT);
413                 }
414         }
415         req->rq_status = obd_setattr(exp, &oinfo, oti);
416         ost_drop_id(exp, &repbody->oa);
417         RETURN(0);
418 }
419
420 static int ost_bulk_timeout(void *data)
421 {
422         ENTRY;
423         /* We don't fail the connection here, because having the export
424          * killed makes the (vital) call to commitrw very sad.
425          */
426         RETURN(1);
427 }
428
429 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
430                                cksum_type_t cksum_type)
431 {
432         __u32 cksum;
433         int i;
434
435         cksum = init_checksum(cksum_type);
436         for (i = 0; i < desc->bd_iov_count; i++) {
437                 struct page *page = desc->bd_iov[i].kiov_page;
438                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
439                 char *ptr = kmap(page) + off;
440                 int len = desc->bd_iov[i].kiov_len;
441
442                 /* corrupt the data before we compute the checksum, to
443                  * simulate a client->OST data error */
444                 if (i == 0 && opc == OST_WRITE &&
445                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE))
446                         memcpy(ptr, "bad3", min(4, len));
447                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
448                 /* corrupt the data after we compute the checksum, to
449                  * simulate an OST->client data error */
450                 if (i == 0 && opc == OST_READ &&
451                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
452                         memcpy(ptr, "bad4", min(4, len));
453                         /* nobody should use corrupted page again */
454                         ClearPageUptodate(page);
455                 }
456                 kunmap(page);
457         }
458
459         return cksum;
460 }
461
462 static int ost_brw_lock_get(int mode, struct obd_export *exp,
463                             struct obd_ioobj *obj, struct niobuf_remote *nb,
464                             struct lustre_handle *lh)
465 {
466         int flags                 = 0;
467         int nrbufs                = obj->ioo_bufcnt;
468         struct ldlm_res_id res_id;
469         ldlm_policy_data_t policy;
470         int i;
471         ENTRY;
472
473         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
474         LASSERT(mode == LCK_PR || mode == LCK_PW);
475         LASSERT(!lustre_handle_is_used(lh));
476
477         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
478                 RETURN(0);
479
480         for (i = 1; i < nrbufs; i ++)
481                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
482                     (nb[i].flags & OBD_BRW_SRVLOCK))
483                         RETURN(-EFAULT);
484
485         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
486         policy.l_extent.end   = (nb[nrbufs - 1].offset +
487                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
488
489         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
490                                       LDLM_EXTENT, &policy, mode, &flags,
491                                       ldlm_blocking_ast, ldlm_completion_ast,
492                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
493 }
494
495 static void ost_brw_lock_put(int mode,
496                              struct obd_ioobj *obj, struct niobuf_remote *niob,
497                              struct lustre_handle *lh)
498 {
499         ENTRY;
500         LASSERT(mode == LCK_PR || mode == LCK_PW);
501         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
502                 lustre_handle_is_used(lh));
503         if (lustre_handle_is_used(lh))
504                 ldlm_lock_decref(lh, mode);
505         EXIT;
506 }
507
508 struct ost_prolong_data {
509         struct obd_export *opd_exp;
510         ldlm_policy_data_t opd_policy;
511         struct obdo *opd_oa;
512         ldlm_mode_t opd_mode;
513         int opd_lock_match;
514         int opd_timeout;
515 };
516
517 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
518 {
519         struct ost_prolong_data *opd = data;
520
521         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
522
523         if (lock->l_req_mode != lock->l_granted_mode) {
524                 /* scan granted locks only */
525                 return LDLM_ITER_STOP;
526         }
527
528         if (lock->l_export != opd->opd_exp) {
529                 /* prolong locks only for given client */
530                 return LDLM_ITER_CONTINUE;
531         }
532
533         if (!(lock->l_granted_mode & opd->opd_mode)) {
534                 /* we aren't interesting in all type of locks */
535                 return LDLM_ITER_CONTINUE;
536         }
537
538         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
539             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
540                 /* the request doesn't cross the lock, skip it */
541                 return LDLM_ITER_CONTINUE;
542         }
543
544         /* Fill the obdo with the matched lock handle.
545          * XXX: it is possible in some cases the IO RPC is covered by several
546          * locks, even for the write case, so it may need to be a lock list. */
547         if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
548                 opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
549                 opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
550         }
551
552         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
553                 /* ignore locks not being cancelled */
554                 return LDLM_ITER_CONTINUE;
555         }
556
557         CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
558                lock->l_resource->lr_name.name[0],
559                lock->l_resource->lr_name.name[1],
560                opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
561         /* OK. this is a possible lock the user holds doing I/O
562          * let's refresh eviction timer for it */
563         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
564         opd->opd_lock_match = 1;
565
566         return LDLM_ITER_CONTINUE;
567 }
568
569 static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
570                                 struct niobuf_remote *nb, struct obdo *oa,
571                                 ldlm_mode_t mode)
572 {
573         struct ldlm_res_id res_id;
574         int nrbufs = obj->ioo_bufcnt;
575         struct ost_prolong_data opd = { 0 };
576         ENTRY;
577
578         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
579
580         opd.opd_mode = mode;
581         opd.opd_exp = req->rq_export;
582         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
583         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
584                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
585
586         /* prolong locks for the current service time of the corresponding
587          * portal (= OST_IO_PORTAL) */
588         opd.opd_timeout = AT_OFF ? obd_timeout / 2:
589                           max(at_est2timeout(at_get(&req->rq_rqbd->
590                               rqbd_service->srv_at_estimate)), ldlm_timeout);
591
592         CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
593                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
594                opd.opd_policy.l_extent.end);
595
596         if (oa->o_valid & OBD_MD_FLHANDLE) {
597                 struct ldlm_lock *lock;
598
599                 lock = ldlm_handle2lock(&oa->o_handle);
600                 if (lock != NULL) {
601                         ost_prolong_locks_iter(lock, &opd);
602                         if (opd.opd_lock_match) {
603                                 LDLM_LOCK_PUT(lock);
604                                 RETURN(1);
605                         }
606
607                         /* Check if the lock covers the whole IO region,
608                          * otherwise iterate through the resource. */
609                         if (lock->l_policy_data.l_extent.end >=
610                             opd.opd_policy.l_extent.end &&
611                             lock->l_policy_data.l_extent.start <=
612                             opd.opd_policy.l_extent.start) {
613                                 LDLM_LOCK_PUT(lock);
614                                 RETURN(0);
615                         }
616                         LDLM_LOCK_PUT(lock);
617                 }
618         }
619
620         opd.opd_oa = oa;
621         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
622                               ost_prolong_locks_iter, &opd);
623         RETURN(opd.opd_lock_match);
624 }
625
626 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
627 {
628         struct ptlrpc_bulk_desc *desc = NULL;
629         struct obd_export *exp = req->rq_export;
630         struct niobuf_remote *remote_nb;
631         struct niobuf_local *local_nb;
632         struct obd_ioobj *ioo;
633         struct ost_body *body, *repbody;
634         struct lustre_capa *capa = NULL;
635         struct l_wait_info lwi;
636         struct lustre_handle lockh = { 0 };
637         int niocount, npages, nob = 0, rc, i;
638         int no_reply = 0;
639         ENTRY;
640
641         req->rq_bulk_read = 1;
642
643         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
644                 GOTO(out, rc = -EIO);
645
646         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
647
648         /* Check if there is eviction in progress, and if so, wait for it to
649          * finish */
650         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
651                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
652                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
653                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
654                         &lwi);
655         }
656         if (exp->exp_failed)
657                 GOTO(out, rc = -ENOTCONN);
658
659         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
660          * ost_rw_hpreq_check(). */
661         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
662         if (body == NULL)
663                 GOTO(out, rc = -EFAULT);
664
665         /*
666          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
667          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
668          * &RMF_NIOBUF_REMOTE.
669          */
670         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
671         if (ioo == NULL)
672                 GOTO(out, rc = -EFAULT);
673
674         niocount = ioo->ioo_bufcnt;
675         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
676         if (remote_nb == NULL)
677                 GOTO(out, rc = -EFAULT);
678
679         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
680                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
681                 if (capa == NULL) {
682                         CERROR("Missing capability for OST BRW READ");
683                         GOTO(out, rc = -EFAULT);
684                 }
685         }
686
687         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, 0);
688         rc = req_capsule_server_pack(&req->rq_pill);
689         if (rc)
690                 GOTO(out, rc);
691
692         /*
693          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
694          * ost_thread_init().
695          */
696         local_nb = ost_tls(req)->local;
697
698         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
699         if (rc != 0)
700                 GOTO(out_bulk, rc);
701
702         /*
703          * If getting the lock took more time than
704          * client was willing to wait, drop it. b=11330
705          */
706         if (cfs_time_current_sec() > req->rq_deadline ||
707             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
708                 no_reply = 1;
709                 CERROR("Dropping timed-out read from %s because locking"
710                        "object "LPX64" took %ld seconds (limit was %ld).\n",
711                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
712                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
713                        req->rq_deadline - req->rq_arrival_time.tv_sec);
714                 GOTO(out_lock, rc = -ETIMEDOUT);
715         }
716
717         npages = OST_THREAD_POOL_SIZE;
718         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
719                         remote_nb, &npages, local_nb, oti, capa);
720         if (rc != 0)
721                 GOTO(out_lock, rc);
722
723         desc = ptlrpc_prep_bulk_exp(req, npages,
724                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
725         if (desc == NULL)
726                 GOTO(out_lock, rc = -ENOMEM);
727
728         if (!lustre_handle_is_used(&lockh))
729                 /* no needs to try to prolong lock if server is asked
730                  * to handle locking (= OBD_BRW_SRVLOCK) */
731                 ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa,
732                                      LCK_PW | LCK_PR);
733
734         nob = 0;
735         for (i = 0; i < npages; i++) {
736                 int page_rc = local_nb[i].rc;
737
738                 if (page_rc < 0) {              /* error */
739                         rc = page_rc;
740                         break;
741                 }
742
743                 nob += page_rc;
744                 if (page_rc != 0) {             /* some data! */
745                         LASSERT (local_nb[i].page != NULL);
746                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
747                                               local_nb[i].offset & ~CFS_PAGE_MASK,
748                                               page_rc);
749                 }
750
751                 if (page_rc != local_nb[i].len) { /* short read */
752                         /* All subsequent pages should be 0 */
753                         while(++i < npages)
754                                 LASSERT(local_nb[i].rc == 0);
755                         break;
756                 }
757         }
758
759         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
760                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
761
762                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
763                         cksum_type = cksum_type_unpack(body->oa.o_flags);
764                 body->oa.o_flags = cksum_type_pack(cksum_type);
765                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
766                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
767                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
768         } else {
769                 body->oa.o_valid = 0;
770         }
771         /* We're finishing using body->oa as an input variable */
772
773         /* Check if client was evicted while we were doing i/o before touching
774            network */
775         if (rc == 0) {
776                 /* Check if there is eviction in progress, and if so, wait for
777                  * it to finish */
778                 if (unlikely(atomic_read(&exp->exp_obd->
779                                                 obd_evict_inprogress))) {
780                         lwi = LWI_INTR(NULL, NULL);
781                         rc = l_wait_event(exp->exp_obd->
782                                                 obd_evict_inprogress_waitq,
783                                           !atomic_read(&exp->exp_obd->
784                                                         obd_evict_inprogress),
785                                           &lwi);
786                 }
787                 /* Check if client was evicted or tried to reconnect already */
788                 if (exp->exp_failed || exp->exp_abort_active_req)
789                         rc = -ENOTCONN;
790                 else {
791                         rc = sptlrpc_svc_wrap_bulk(req, desc);
792                         if (rc == 0)
793                                 rc = ptlrpc_start_bulk_transfer(desc);
794                 }
795
796                 if (rc == 0) {
797                         time_t start = cfs_time_current_sec();
798                         do {
799                                 long timeoutl = req->rq_deadline -
800                                         cfs_time_current_sec();
801                                 cfs_duration_t timeout = timeoutl <= 0 ?
802                                         CFS_TICK : cfs_time_seconds(timeoutl);
803                                 lwi = LWI_TIMEOUT_INTERVAL(timeout,
804                                                            cfs_time_seconds(1),
805                                                            ost_bulk_timeout,
806                                                            desc);
807                                 rc = l_wait_event(desc->bd_waitq,
808                                                   !ptlrpc_server_bulk_active(desc) ||
809                                                   exp->exp_failed ||
810                                                   exp->exp_abort_active_req,
811                                                   &lwi);
812                                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
813                                 /* Wait again if we changed deadline */
814                         } while ((rc == -ETIMEDOUT) &&
815                                  (req->rq_deadline > cfs_time_current_sec()));
816
817                         if (rc == -ETIMEDOUT) {
818                                 DEBUG_REQ(D_ERROR, req,
819                                           "timeout on bulk PUT after %ld%+lds",
820                                           req->rq_deadline - start,
821                                           cfs_time_current_sec() -
822                                           req->rq_deadline);
823                                 ptlrpc_abort_bulk(desc);
824                         } else if (exp->exp_failed) {
825                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
826                                 rc = -ENOTCONN;
827                                 ptlrpc_abort_bulk(desc);
828                         } else if (exp->exp_abort_active_req) {
829                                 DEBUG_REQ(D_ERROR, req, "Reconnect on bulk PUT");
830                                 /* we don't reply anyway */
831                                 rc = -ETIMEDOUT;
832                                 ptlrpc_abort_bulk(desc);
833                         } else if (!desc->bd_success ||
834                                    desc->bd_nob_transferred != desc->bd_nob) {
835                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
836                                           desc->bd_success ?
837                                           "truncated" : "network error on",
838                                           desc->bd_nob_transferred,
839                                           desc->bd_nob);
840                                 /* XXX should this be a different errno? */
841                                 rc = -ETIMEDOUT;
842                         }
843                 } else {
844                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
845                 }
846                 no_reply = rc != 0;
847         }
848
849         /* Must commit after prep above in all cases */
850         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
851                           remote_nb, npages, local_nb, oti, rc);
852
853         if (rc == 0) {
854                 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
855                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
856                 ost_drop_id(exp, &repbody->oa);
857         }
858
859 out_lock:
860         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
861 out_bulk:
862         if (desc)
863                 ptlrpc_free_bulk(desc);
864 out:
865         LASSERT(rc <= 0);
866         if (rc == 0) {
867                 req->rq_status = nob;
868                 ptlrpc_lprocfs_brw(req, nob);
869                 target_committed_to_req(req);
870                 ptlrpc_reply(req);
871         } else if (!no_reply) {
872                 /* Only reply if there was no comms problem with bulk */
873                 target_committed_to_req(req);
874                 req->rq_status = rc;
875                 ptlrpc_error(req);
876         } else {
877                 /* reply out callback would free */
878                 ptlrpc_req_drop_rs(req);
879                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
880                       "client will retry\n",
881                       exp->exp_obd->obd_name,
882                       exp->exp_client_uuid.uuid,
883                       exp->exp_connection->c_remote_uuid.uuid,
884                       libcfs_id2str(req->rq_peer));
885         }
886
887         RETURN(rc);
888 }
889
890 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
891 {
892         struct ptlrpc_bulk_desc *desc = NULL;
893         struct obd_export       *exp = req->rq_export;
894         struct niobuf_remote    *remote_nb;
895         struct niobuf_local     *local_nb;
896         struct obd_ioobj        *ioo;
897         struct ost_body         *body, *repbody;
898         struct l_wait_info       lwi;
899         struct lustre_handle     lockh = {0};
900         struct lustre_capa      *capa = NULL;
901         __u32                   *rcs;
902         int objcount, niocount, npages;
903         int rc, i, j;
904         obd_count                client_cksum = 0, server_cksum = 0;
905         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
906         int                      no_reply = 0;
907         __u32                    o_uid = 0, o_gid = 0;
908         ENTRY;
909
910         req->rq_bulk_write = 1;
911
912         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
913                 GOTO(out, rc = -EIO);
914         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
915                 GOTO(out, rc = -EFAULT);
916
917         /* pause before transaction has been started */
918         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
919
920         /* Check if there is eviction in progress, and if so, wait for it to
921          * finish */
922         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
923                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
924                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
925                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
926                         &lwi);
927         }
928         if (exp->exp_failed)
929                 GOTO(out, rc = -ENOTCONN);
930
931         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
932          * ost_rw_hpreq_check(). */
933         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
934         if (body == NULL)
935                 GOTO(out, rc = -EFAULT);
936
937         if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
938             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
939                 libcfs_memory_pressure_set();
940
941         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
942                                         RCL_CLIENT) / sizeof(*ioo);
943         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
944         if (ioo == NULL)
945                 GOTO(out, rc = -EFAULT);
946         for (niocount = i = 0; i < objcount; i++)
947                 niocount += ioo[i].ioo_bufcnt;
948
949         /*
950          * It'd be nice to have a capsule function to indicate how many elements
951          * there were in a buffer for an RMF that's declared to be an array.
952          * It's easy enough to compute the number of elements here though.
953          */
954         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
955         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
956             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
957                 GOTO(out, rc = -EFAULT);
958
959         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
960                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
961                 if (capa == NULL) {
962                         CERROR("Missing capability for OST BRW WRITE");
963                         GOTO(out, rc = -EFAULT);
964                 }
965         }
966
967         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
968                              niocount * sizeof(*rcs));
969         rc = req_capsule_server_pack(&req->rq_pill);
970         if (rc != 0)
971                 GOTO(out, rc);
972         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
973         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
974
975         /*
976          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
977          * ost_thread_init().
978          */
979         local_nb = ost_tls(req)->local;
980
981         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
982         if (rc != 0)
983                 GOTO(out_bulk, rc);
984
985         /*
986          * If getting the lock took more time than
987          * client was willing to wait, drop it. b=11330
988          */
989         if (cfs_time_current_sec() > req->rq_deadline ||
990             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
991                 no_reply = 1;
992                 CERROR("Dropping timed-out write from %s because locking "
993                        "object "LPX64" took %ld seconds (limit was %ld).\n",
994                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
995                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
996                        req->rq_deadline - req->rq_arrival_time.tv_sec);
997                 GOTO(out_lock, rc = -ETIMEDOUT);
998         }
999
1000         if (!lustre_handle_is_used(&lockh))
1001                 /* no needs to try to prolong lock if server is asked
1002                  * to handle locking (= OBD_BRW_SRVLOCK) */
1003                 ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
1004
1005         /* obd_preprw clobbers oa->valid, so save what we need */
1006         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1007                 client_cksum = body->oa.o_cksum;
1008                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1009                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1010         }
1011
1012         /* Because we already sync grant info with client when reconnect,
1013          * grant info will be cleared for resent req, then fed_grant and
1014          * total_grant will not be modified in following preprw_write */
1015         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1016                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1017                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1018         }
1019
1020         if (exp_connect_rmtclient(exp)) {
1021                 o_uid = body->oa.o_uid;
1022                 o_gid = body->oa.o_gid;
1023         }
1024         npages = OST_THREAD_POOL_SIZE;
1025         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
1026                         ioo, remote_nb, &npages, local_nb, oti, capa);
1027         if (rc != 0)
1028                 GOTO(out_lock, rc);
1029
1030         desc = ptlrpc_prep_bulk_exp(req, npages,
1031                                      BULK_GET_SINK, OST_BULK_PORTAL);
1032         if (desc == NULL)
1033                 GOTO(out_lock, rc = -ENOMEM);
1034
1035         /* NB Having prepped, we must commit... */
1036
1037         for (i = 0; i < npages; i++)
1038                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1039                                       local_nb[i].offset & ~CFS_PAGE_MASK,
1040                                       local_nb[i].len);
1041
1042         rc = sptlrpc_svc_prep_bulk(req, desc);
1043         if (rc != 0)
1044                 GOTO(out_lock, rc);
1045
1046         /* Check if client was evicted or tried to reconnect while we
1047          * were doing i/o before touching network */
1048         if (desc->bd_export->exp_failed ||
1049             desc->bd_export->exp_abort_active_req)
1050                 rc = -ENOTCONN;
1051         else
1052                 rc = ptlrpc_start_bulk_transfer(desc);
1053         if (rc == 0) {
1054                 time_t start = cfs_time_current_sec();
1055                 do {
1056                         long timeoutl = req->rq_deadline -
1057                                 cfs_time_current_sec();
1058                         cfs_duration_t timeout = timeoutl <= 0 ?
1059                                 CFS_TICK : cfs_time_seconds(timeoutl);
1060                         lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
1061                                                    ost_bulk_timeout, desc);
1062                         rc = l_wait_event(desc->bd_waitq,
1063                                           !ptlrpc_server_bulk_active(desc) ||
1064                                           desc->bd_export->exp_failed ||
1065                                           desc->bd_export->exp_abort_active_req,
1066                                           &lwi);
1067                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1068                         /* Wait again if we changed deadline */
1069                 } while ((rc == -ETIMEDOUT) &&
1070                          (req->rq_deadline > cfs_time_current_sec()));
1071
1072                 if (rc == -ETIMEDOUT) {
1073                         DEBUG_REQ(D_ERROR, req,
1074                                   "timeout on bulk GET after %ld%+lds",
1075                                   req->rq_deadline - start,
1076                                   cfs_time_current_sec() -
1077                                   req->rq_deadline);
1078                         ptlrpc_abort_bulk(desc);
1079                 } else if (desc->bd_export->exp_failed) {
1080                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1081                         rc = -ENOTCONN;
1082                         ptlrpc_abort_bulk(desc);
1083                 } else if (desc->bd_export->exp_abort_active_req) {
1084                         DEBUG_REQ(D_ERROR, req, "Reconnect on bulk GET");
1085                         /* we don't reply anyway */
1086                         rc = -ETIMEDOUT;
1087                         ptlrpc_abort_bulk(desc);
1088                 } else if (!desc->bd_success) {
1089                         DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
1090                         /* XXX should this be a different errno? */
1091                         rc = -ETIMEDOUT;
1092                 } else {
1093                         rc = sptlrpc_svc_unwrap_bulk(req, desc);
1094                 }
1095         } else {
1096                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
1097         }
1098         no_reply = rc != 0;
1099
1100         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1101         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1102
1103         if (unlikely(client_cksum != 0 && rc == 0)) {
1104                 static int cksum_counter;
1105                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1106                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1107                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1108                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1109                 repbody->oa.o_cksum = server_cksum;
1110                 cksum_counter++;
1111                 if (unlikely(client_cksum != server_cksum)) {
1112                         CERROR("client csum %x, server csum %x\n",
1113                                client_cksum, server_cksum);
1114                         cksum_counter = 0;
1115                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1116                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1117                                cksum_counter, libcfs_id2str(req->rq_peer),
1118                                server_cksum);
1119                 }
1120         }
1121
1122         /* Must commit after prep above in all cases */
1123         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1124                           remote_nb, npages, local_nb, oti, rc);
1125         if (rc == -ENOTCONN)
1126                 /* quota acquire process has been given up because
1127                  * either the client has been evicted or the client
1128                  * has timed out the request already */
1129                 no_reply = 1;
1130
1131         if (exp_connect_rmtclient(exp)) {
1132                 repbody->oa.o_uid = o_uid;
1133                 repbody->oa.o_gid = o_gid;
1134         }
1135
1136         if (unlikely(client_cksum != server_cksum && rc == 0)) {
1137                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1138                 char *msg;
1139                 char *via;
1140                 char *router;
1141
1142                 if (new_cksum == server_cksum)
1143                         msg = "changed in transit before arrival at OST";
1144                 else if (new_cksum == client_cksum)
1145                         msg = "initial checksum before message complete";
1146                 else
1147                         msg = "changed in transit AND after initial checksum";
1148
1149                 if (req->rq_peer.nid == desc->bd_sender) {
1150                         via = router = "";
1151                 } else {
1152                         via = " via ";
1153                         router = libcfs_nid2str(desc->bd_sender);
1154                 }
1155
1156                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1157                                    "%s%s%s inum "LPU64"/"LPU64" object "
1158                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1159                                    exp->exp_obd->obd_name, msg,
1160                                    libcfs_id2str(req->rq_peer),
1161                                    via, router,
1162                                    body->oa.o_valid & OBD_MD_FLFID ?
1163                                                 body->oa.o_fid : (__u64)0,
1164                                    body->oa.o_valid & OBD_MD_FLFID ?
1165                                                 body->oa.o_generation :(__u64)0,
1166                                    body->oa.o_id,
1167                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1168                                                 body->oa.o_gr : (__u64)0,
1169                                    local_nb[0].offset,
1170                                    local_nb[npages-1].offset +
1171                                    local_nb[npages-1].len - 1 );
1172                 CERROR("client csum %x, original server csum %x, "
1173                        "server csum now %x\n",
1174                        client_cksum, server_cksum, new_cksum);
1175         }
1176
1177         if (rc == 0) {
1178                 int nob = 0;
1179
1180                 /* set per-requested niobuf return codes */
1181                 for (i = j = 0; i < niocount; i++) {
1182                         int len = remote_nb[i].len;
1183
1184                         nob += len;
1185                         rcs[i] = 0;
1186                         do {
1187                                 LASSERT(j < npages);
1188                                 if (local_nb[j].rc < 0)
1189                                         rcs[i] = local_nb[j].rc;
1190                                 len -= local_nb[j].len;
1191                                 j++;
1192                         } while (len > 0);
1193                         LASSERT(len == 0);
1194                 }
1195                 LASSERT(j == npages);
1196                 ptlrpc_lprocfs_brw(req, nob);
1197         }
1198
1199 out_lock:
1200         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1201 out_bulk:
1202         if (desc)
1203                 ptlrpc_free_bulk(desc);
1204 out:
1205         if (rc == 0) {
1206                 oti_to_request(oti, req);
1207                 target_committed_to_req(req);
1208                 rc = ptlrpc_reply(req);
1209         } else if (!no_reply) {
1210                 /* Only reply if there was no comms problem with bulk */
1211                 target_committed_to_req(req);
1212                 req->rq_status = rc;
1213                 ptlrpc_error(req);
1214         } else {
1215                 /* reply out callback would free */
1216                 ptlrpc_req_drop_rs(req);
1217                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1218                       "client will retry\n",
1219                       exp->exp_obd->obd_name,
1220                       exp->exp_client_uuid.uuid,
1221                       exp->exp_connection->c_remote_uuid.uuid,
1222                       libcfs_id2str(req->rq_peer));
1223         }
1224         libcfs_memory_pressure_clr();
1225         RETURN(rc);
1226 }
1227
1228 /**
1229  * Implementation of OST_SET_INFO.
1230  *
1231  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1232  * "key" and a value RPC buffers as arguments, with the value's contents
1233  * interpreted according to the key.
1234  *
1235  * Value types that need swabbing have swabbing done explicitly, either here or
1236  * in functions called from here.  This should be corrected: all swabbing should
1237  * be done in the capsule abstraction, as that will then allow us to move
1238  * swabbing exclusively to the client without having to modify server code
1239  * outside the capsule abstraction's implementation itself.  To correct this
1240  * will require minor changes to the capsule abstraction; see the comments for
1241  * req_capsule_extend() in layout.c.
1242  */
1243 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1244 {
1245         struct ost_body *body = NULL, *repbody;
1246         char *key, *val = NULL;
1247         int keylen, vallen, rc = 0;
1248         int is_grant_shrink = 0;
1249         ENTRY;
1250
1251         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1252         if (key == NULL) {
1253                 DEBUG_REQ(D_HA, req, "no set_info key");
1254                 RETURN(-EFAULT);
1255         }
1256         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1257                                       RCL_CLIENT);
1258
1259         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1260                                       RCL_CLIENT);
1261
1262         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1263                 /* In this case the value is actually an RMF_OST_BODY, so we
1264                  * transmutate the type of this PTLRPC */
1265                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1266
1267         rc = req_capsule_server_pack(&req->rq_pill);
1268         if (rc)
1269                 RETURN(rc);
1270
1271         if (vallen) {
1272                 if (is_grant_shrink) {
1273                         body = req_capsule_client_get(&req->rq_pill,
1274                                                       &RMF_OST_BODY);
1275                         if (!body)
1276                                 RETURN(-EFAULT);
1277
1278                         repbody = req_capsule_server_get(&req->rq_pill,
1279                                                          &RMF_OST_BODY);
1280                         memcpy(repbody, body, sizeof(*body));
1281                         val = (char*)repbody;
1282                 } else {
1283                         val = req_capsule_client_get(&req->rq_pill,
1284                                                      &RMF_SETINFO_VAL);
1285                 }
1286         }
1287
1288         if (KEY_IS(KEY_EVICT_BY_NID)) {
1289                 if (val && vallen)
1290                         obd_export_evict_by_nid(exp->exp_obd, val);
1291                 GOTO(out, rc = 0);
1292         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1293                 if (vallen < sizeof(__u32))
1294                         RETURN(-EFAULT);
1295                 __swab32s((__u32 *)val);
1296         }
1297
1298         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1299          * a struct ost_body * value */
1300         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1301 out:
1302         lustre_msg_set_status(req->rq_repmsg, 0);
1303         RETURN(rc);
1304 }
1305
1306 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1307 {
1308         void *key, *reply;
1309         int keylen, replylen, rc = 0;
1310         struct req_capsule *pill = &req->rq_pill;
1311         ENTRY;
1312
1313         /* this common part for get_info rpc */
1314         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1315         if (key == NULL) {
1316                 DEBUG_REQ(D_HA, req, "no get_info key");
1317                 RETURN(-EFAULT);
1318         }
1319         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1320
1321         rc = obd_get_info(exp, keylen, key, &replylen, NULL, NULL);
1322         if (rc)
1323                 RETURN(rc);
1324
1325         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1326                              RCL_SERVER, replylen);
1327
1328         rc = req_capsule_server_pack(pill);
1329         if (rc)
1330                 RETURN(rc);
1331
1332         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1333         if (reply == NULL)
1334                 RETURN(-ENOMEM);
1335
1336         /* call again to fill in the reply buffer */
1337         rc = obd_get_info(exp, keylen, key, &replylen, reply, NULL);
1338
1339         lustre_msg_set_status(req->rq_repmsg, 0);
1340         RETURN(rc);
1341 }
1342
1343 #ifdef HAVE_QUOTA_SUPPORT
1344 static int ost_handle_quotactl(struct ptlrpc_request *req)
1345 {
1346         struct obd_quotactl *oqctl, *repoqc;
1347         int rc;
1348         ENTRY;
1349
1350         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1351         if (oqctl == NULL)
1352                 GOTO(out, rc = -EPROTO);
1353
1354         rc = req_capsule_server_pack(&req->rq_pill);
1355         if (rc)
1356                 GOTO(out, rc);
1357
1358         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1359         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1360         *repoqc = *oqctl;
1361
1362 out:
1363         RETURN(rc);
1364 }
1365
1366 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1367 {
1368         struct obd_quotactl *oqctl;
1369         int rc;
1370         ENTRY;
1371
1372         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1373         if (oqctl == NULL)
1374                 RETURN(-EPROTO);
1375
1376         rc = req_capsule_server_pack(&req->rq_pill);
1377         if (rc)
1378                 RETURN(-ENOMEM);
1379
1380         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1381         RETURN(0);
1382 }
1383
1384 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1385 {
1386         struct quota_adjust_qunit *oqaq, *repoqa;
1387         struct lustre_quota_ctxt *qctxt;
1388         int rc;
1389         ENTRY;
1390
1391         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1392         oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1393         if (oqaq == NULL)
1394                 GOTO(out, rc = -EPROTO);
1395
1396         rc = req_capsule_server_pack(&req->rq_pill);
1397         if (rc)
1398                 GOTO(out, rc);
1399
1400         repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1401         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
1402         *repoqa = *oqaq;
1403
1404  out:
1405         RETURN(rc);
1406 }
1407 #endif
1408
1409 static int ost_llog_handle_connect(struct obd_export *exp,
1410                                    struct ptlrpc_request *req)
1411 {
1412         struct llogd_conn_body *body;
1413         int rc;
1414         ENTRY;
1415
1416         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1417         rc = obd_llog_connect(exp, body);
1418         RETURN(rc);
1419 }
1420
1421 #define ost_init_sec_none(reply, exp)                                   \
1422 do {                                                                    \
1423         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1424                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1425                                       OBD_CONNECT_OSS_CAPA);            \
1426         spin_lock(&exp->exp_lock);                                      \
1427         exp->exp_connect_flags = reply->ocd_connect_flags;              \
1428         spin_unlock(&exp->exp_lock);                                    \
1429 } while (0)
1430
1431 static int ost_init_sec_level(struct ptlrpc_request *req)
1432 {
1433         struct obd_export *exp = req->rq_export;
1434         struct req_capsule *pill = &req->rq_pill;
1435         struct obd_device *obd = exp->exp_obd;
1436         struct filter_obd *filter = &obd->u.filter;
1437         char *client = libcfs_nid2str(req->rq_peer.nid);
1438         struct obd_connect_data *data, *reply;
1439         int rc = 0, remote;
1440         ENTRY;
1441
1442         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1443         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1444         if (data == NULL || reply == NULL)
1445                 RETURN(-EFAULT);
1446
1447         /* connection from MDT is always trusted */
1448         if (req->rq_auth_usr_mdt) {
1449                 ost_init_sec_none(reply, exp);
1450                 RETURN(0);
1451         }
1452
1453         /* no GSS support case */
1454         if (!req->rq_auth_gss) {
1455                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1456                         CWARN("client %s -> target %s does not user GSS, "
1457                               "can not run under security level %d.\n",
1458                               client, obd->obd_name, filter->fo_sec_level);
1459                         RETURN(-EACCES);
1460                 } else {
1461                         ost_init_sec_none(reply, exp);
1462                         RETURN(0);
1463                 }
1464         }
1465
1466         /* old version case */
1467         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1468                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1469                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1470                         CWARN("client %s -> target %s uses old version, "
1471                               "can not run under security level %d.\n",
1472                               client, obd->obd_name, filter->fo_sec_level);
1473                         RETURN(-EACCES);
1474                 } else {
1475                         CWARN("client %s -> target %s uses old version, "
1476                               "run under security level %d.\n",
1477                               client, obd->obd_name, filter->fo_sec_level);
1478                         ost_init_sec_none(reply, exp);
1479                         RETURN(0);
1480                 }
1481         }
1482
1483         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1484         if (remote) {
1485                 if (!req->rq_auth_remote)
1486                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1487                                "asked to be remote.\n", client, obd->obd_name);
1488         } else if (req->rq_auth_remote) {
1489                 remote = 1;
1490                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1491                        "as remote by default.\n", client, obd->obd_name);
1492         }
1493
1494         if (remote) {
1495                 if (!filter->fo_fl_oss_capa) {
1496                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1497                                " but OSS capabilities are not enabled: %d.\n",
1498                                client, obd->obd_name, filter->fo_fl_oss_capa);
1499                         RETURN(-EACCES);
1500                 }
1501         }
1502
1503         switch (filter->fo_sec_level) {
1504         case LUSTRE_SEC_NONE:
1505                 if (!remote) {
1506                         ost_init_sec_none(reply, exp);
1507                         break;
1508                 } else {
1509                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1510                                "can not run under security level %d.\n",
1511                                client, obd->obd_name, filter->fo_sec_level);
1512                         RETURN(-EACCES);
1513                 }
1514         case LUSTRE_SEC_REMOTE:
1515                 if (!remote)
1516                         ost_init_sec_none(reply, exp);
1517                 break;
1518         case LUSTRE_SEC_ALL:
1519                 if (!remote) {
1520                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1521                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1522                         if (!filter->fo_fl_oss_capa)
1523                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1524
1525                         spin_lock(&exp->exp_lock);
1526                         exp->exp_connect_flags = reply->ocd_connect_flags;
1527                         spin_unlock(&exp->exp_lock);
1528                 }
1529                 break;
1530         default:
1531                 RETURN(-EINVAL);
1532         }
1533
1534         RETURN(rc);
1535 }
1536
1537 /*
1538  * FIXME
1539  * this should be done in filter_connect()/filter_reconnect(), but
1540  * we can't obtain information like NID, which stored in incoming
1541  * request, thus can't decide what flavor to use. so we do it here.
1542  *
1543  * This hack should be removed after the OST stack be rewritten, just
1544  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1545  */
1546 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1547 {
1548         struct obd_export     *exp = req->rq_export;
1549         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1550         struct sptlrpc_flavor  flvr;
1551         int                    rc = 0;
1552
1553         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1554                             LUSTRE_ECHO_NAME) == 0)) {
1555                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1556                 return 0;
1557         }
1558
1559         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1560                 read_lock(&filter->fo_sptlrpc_lock);
1561                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1562                                              req->rq_sp_from,
1563                                              req->rq_peer.nid,
1564                                              &flvr);
1565                 read_unlock(&filter->fo_sptlrpc_lock);
1566
1567                 spin_lock(&exp->exp_lock);
1568
1569                 exp->exp_sp_peer = req->rq_sp_from;
1570                 exp->exp_flvr = flvr;
1571
1572                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1573                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1574                         CERROR("unauthorized rpc flavor %x from %s, "
1575                                "expect %x\n", req->rq_flvr.sf_rpc,
1576                                libcfs_nid2str(req->rq_peer.nid),
1577                                exp->exp_flvr.sf_rpc);
1578                         rc = -EACCES;
1579                 }
1580
1581                 spin_unlock(&exp->exp_lock);
1582         } else {
1583                 if (exp->exp_sp_peer != req->rq_sp_from) {
1584                         CERROR("RPC source %s doesn't match %s\n",
1585                                sptlrpc_part2name(req->rq_sp_from),
1586                                sptlrpc_part2name(exp->exp_sp_peer));
1587                         rc = -EACCES;
1588                 } else {
1589                         rc = sptlrpc_target_export_check(exp, req);
1590                 }
1591         }
1592
1593         return rc;
1594 }
1595
1596 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1597                                        struct obd_device *obd, int *process)
1598 {
1599         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1600         case OST_CONNECT: /* This will never get here, but for completeness. */
1601         case OST_DISCONNECT:
1602                *process = 1;
1603                RETURN(0);
1604
1605         case OBD_PING:
1606         case OST_CREATE:
1607         case OST_DESTROY:
1608         case OST_PUNCH:
1609         case OST_SETATTR:
1610         case OST_SYNC:
1611         case OST_WRITE:
1612         case OBD_LOG_CANCEL:
1613         case LDLM_ENQUEUE:
1614                 *process = target_queue_recovery_request(req, obd);
1615                 RETURN(0);
1616
1617         default:
1618                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1619                 *process = -EAGAIN;
1620                 RETURN(0);
1621         }
1622 }
1623
1624 int ost_msg_check_version(struct lustre_msg *msg)
1625 {
1626         int rc;
1627
1628         switch(lustre_msg_get_opc(msg)) {
1629         case OST_CONNECT:
1630         case OST_DISCONNECT:
1631         case OBD_PING:
1632         case SEC_CTX_INIT:
1633         case SEC_CTX_INIT_CONT:
1634         case SEC_CTX_FINI:
1635                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1636                 if (rc)
1637                         CERROR("bad opc %u version %08x, expecting %08x\n",
1638                                lustre_msg_get_opc(msg),
1639                                lustre_msg_get_version(msg),
1640                                LUSTRE_OBD_VERSION);
1641                 break;
1642         case OST_CREATE:
1643         case OST_DESTROY:
1644         case OST_GETATTR:
1645         case OST_SETATTR:
1646         case OST_WRITE:
1647         case OST_READ:
1648         case OST_PUNCH:
1649         case OST_STATFS:
1650         case OST_SYNC:
1651         case OST_SET_INFO:
1652         case OST_GET_INFO:
1653 #ifdef HAVE_QUOTA_SUPPORT
1654         case OST_QUOTACHECK:
1655         case OST_QUOTACTL:
1656         case OST_QUOTA_ADJUST_QUNIT:
1657 #endif
1658                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1659                 if (rc)
1660                         CERROR("bad opc %u version %08x, expecting %08x\n",
1661                                lustre_msg_get_opc(msg),
1662                                lustre_msg_get_version(msg),
1663                                LUSTRE_OST_VERSION);
1664                 break;
1665         case LDLM_ENQUEUE:
1666         case LDLM_CONVERT:
1667         case LDLM_CANCEL:
1668         case LDLM_BL_CALLBACK:
1669         case LDLM_CP_CALLBACK:
1670                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1671                 if (rc)
1672                         CERROR("bad opc %u version %08x, expecting %08x\n",
1673                                lustre_msg_get_opc(msg),
1674                                lustre_msg_get_version(msg),
1675                                LUSTRE_DLM_VERSION);
1676                 break;
1677         case LLOG_ORIGIN_CONNECT:
1678         case OBD_LOG_CANCEL:
1679                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1680                 if (rc)
1681                         CERROR("bad opc %u version %08x, expecting %08x\n",
1682                                lustre_msg_get_opc(msg),
1683                                lustre_msg_get_version(msg),
1684                                LUSTRE_LOG_VERSION);
1685                 break;
1686         default:
1687                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1688                 rc = -ENOTSUPP;
1689         }
1690         return rc;
1691 }
1692
1693 /**
1694  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1695  * not.
1696  */
1697 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1698                                    struct ldlm_lock *lock)
1699 {
1700         struct niobuf_remote *nb;
1701         struct obd_ioobj *ioo;
1702         struct ost_body *body;
1703         int objcount, niocount;
1704         int mode, opc, i;
1705         __u64 start, end;
1706         ENTRY;
1707
1708         opc = lustre_msg_get_opc(req->rq_reqmsg);
1709         LASSERT(opc == OST_READ || opc == OST_WRITE);
1710
1711         /* As the request may be covered by several locks, do not look at
1712          * o_handle, look at the RPC IO region. */
1713         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1714         if (body == NULL)
1715                 RETURN(0);
1716
1717         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1718                                         RCL_CLIENT) / sizeof(*ioo);
1719         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1720         if (ioo == NULL)
1721                 RETURN(0);
1722
1723         for (niocount = i = 0; i < objcount; i++)
1724                 niocount += ioo[i].ioo_bufcnt;
1725
1726         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1727         if (nb == NULL ||
1728             niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
1729             RCL_CLIENT) / sizeof(*nb)))
1730                 RETURN(0);
1731
1732         mode = LCK_PW;
1733         if (opc == OST_READ)
1734                 mode |= LCK_PR;
1735
1736         start = nb[0].offset & CFS_PAGE_MASK;
1737         end = (nb[ioo->ioo_bufcnt - 1].offset +
1738                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
1739
1740         LASSERT(lock->l_resource != NULL);
1741         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_gr,
1742                              &lock->l_resource->lr_name))
1743                 RETURN(0);
1744
1745         if (!(lock->l_granted_mode & mode))
1746                 RETURN(0);
1747
1748         if (lock->l_policy_data.l_extent.end < start ||
1749             lock->l_policy_data.l_extent.start > end)
1750                 RETURN(0);
1751
1752         RETURN(1);
1753 }
1754
1755 /**
1756  * High-priority queue request check for whether the given PTLRPC request (\a
1757  * req) is blocking an LDLM lock cancel.
1758  *
1759  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1760  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1761  *
1762  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1763  * function looks only at OST_READs and OST_WRITEs.
1764  */
1765 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1766 {
1767         struct niobuf_remote *nb;
1768         struct obd_ioobj *ioo;
1769         struct ost_body *body;
1770         int objcount, niocount;
1771         int mode, opc, i;
1772         ENTRY;
1773
1774         opc = lustre_msg_get_opc(req->rq_reqmsg);
1775         LASSERT(opc == OST_READ || opc == OST_WRITE);
1776
1777         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1778         if (body == NULL)
1779                 RETURN(-EFAULT);
1780
1781         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
1782                                         RCL_CLIENT) / sizeof(*ioo);
1783         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1784         if (ioo == NULL)
1785                 RETURN(-EFAULT);
1786
1787         for (niocount = i = 0; i < objcount; i++)
1788                 niocount += ioo[i].ioo_bufcnt;
1789         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1790         if (nb == NULL ||
1791             niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
1792             RCL_CLIENT) / sizeof(*nb)))
1793                 RETURN(-EFAULT);
1794         if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
1795                 RETURN(-EFAULT);
1796
1797         mode = LCK_PW;
1798         if (opc == OST_READ)
1799                 mode |= LCK_PR;
1800         RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
1801 }
1802
1803 static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
1804 {
1805         struct ldlm_res_id res_id = { .name = { oa->o_id } };
1806         struct ost_prolong_data opd = { 0 };
1807         __u64 start, end;
1808         ENTRY;
1809
1810         start = oa->o_size;
1811         end = start + oa->o_blocks;
1812
1813         opd.opd_mode = LCK_PW;
1814         opd.opd_exp = req->rq_export;
1815         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
1816         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
1817                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
1818         else
1819                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
1820
1821         /* prolong locks for the current service time of the corresponding
1822          * portal (= OST_IO_PORTAL) */
1823         opd.opd_timeout = AT_OFF ? obd_timeout / 2:
1824                           max(at_est2timeout(at_get(&req->rq_rqbd->
1825                               rqbd_service->srv_at_estimate)), ldlm_timeout);
1826
1827         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1828                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
1829                opd.opd_policy.l_extent.end);
1830
1831         opd.opd_oa = oa;
1832         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
1833                               ost_prolong_locks_iter, &opd);
1834         RETURN(opd.opd_lock_match);
1835 }
1836
1837 /**
1838  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1839  */
1840 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1841                                       struct ldlm_lock *lock)
1842 {
1843         struct ost_body *body;
1844         ENTRY;
1845
1846         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1847         if (body == NULL)
1848                 RETURN(0);  /* can't return -EFAULT here */
1849
1850         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1851             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1852                 RETURN(1);
1853         RETURN(0);
1854 }
1855
1856 /**
1857  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1858  */
1859 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1860 {
1861         struct ost_body *body;
1862
1863         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1864         if (body == NULL)
1865                 RETURN(-EFAULT);
1866
1867         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1868                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
1869
1870         RETURN(ost_punch_prolong_locks(req, &body->oa));
1871 }
1872
1873 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1874         .hpreq_lock_match  = ost_rw_hpreq_lock_match,
1875         .hpreq_check       = ost_rw_hpreq_check,
1876 };
1877
1878 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1879         .hpreq_lock_match  = ost_punch_hpreq_lock_match,
1880         .hpreq_check       = ost_punch_hpreq_check,
1881 };
1882
1883 /** Assign high priority operations to the request if needed. */
1884 static int ost_hpreq_handler(struct ptlrpc_request *req)
1885 {
1886         ENTRY;
1887         if (req->rq_export) {
1888                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1889                 struct ost_body *body;
1890
1891                 if (opc == OST_READ || opc == OST_WRITE) {
1892                         struct niobuf_remote *nb;
1893                         struct obd_ioobj *ioo;
1894                         int objcount, niocount;
1895                         int i;
1896
1897                         /* RPCs on the H-P queue can be inspected before
1898                          * ost_handler() initializes their pills, so we
1899                          * initialize that here.  Capsule initialization is
1900                          * idempotent, as is setting the pill's format (provided
1901                          * it doesn't change).
1902                          */
1903                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1904                         req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
1905
1906                         body = req_capsule_client_get(&req->rq_pill,
1907                                                       &RMF_OST_BODY);
1908                         if (body == NULL) {
1909                                 CERROR("Missing/short ost_body\n");
1910                                 RETURN(-EFAULT);
1911                         }
1912                         objcount = req_capsule_get_size(&req->rq_pill,
1913                                                         &RMF_OBD_IOOBJ,
1914                                                         RCL_CLIENT) /
1915                                                         sizeof(*ioo);
1916                         if (objcount == 0) {
1917                                 CERROR("Missing/short ioobj\n");
1918                                 RETURN(-EFAULT);
1919                         }
1920                         if (objcount > 1) {
1921                                 CERROR("too many ioobjs (%d)\n", objcount);
1922                                 RETURN(-EFAULT);
1923                         }
1924
1925                         ioo = req_capsule_client_get(&req->rq_pill,
1926                                                      &RMF_OBD_IOOBJ);
1927                         if (ioo == NULL) {
1928                                 CERROR("Missing/short ioobj\n");
1929                                 RETURN(-EFAULT);
1930                         }
1931
1932                         for (niocount = i = 0; i < objcount; i++) {
1933                                 if (ioo[i].ioo_bufcnt == 0) {
1934                                         CERROR("ioo[%d] has zero bufcnt\n", i);
1935                                         RETURN(-EFAULT);
1936                                 }
1937                                 niocount += ioo[i].ioo_bufcnt;
1938                         }
1939                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
1940                                 DEBUG_REQ(D_RPCTRACE, req,
1941                                           "bulk has too many pages (%d)",
1942                                           niocount);
1943                                 RETURN(-EFAULT);
1944                         }
1945
1946                         nb = req_capsule_client_get(&req->rq_pill,
1947                                                     &RMF_NIOBUF_REMOTE);
1948                         if (nb == NULL) {
1949                                 CERROR("Missing/short niobuf\n");
1950                                 RETURN(-EFAULT);
1951                         }
1952
1953                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
1954                                 req->rq_ops = &ost_hpreq_rw;
1955                 } else if (opc == OST_PUNCH) {
1956                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1957                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
1958
1959                         body = req_capsule_client_get(&req->rq_pill,
1960                                                       &RMF_OST_BODY);
1961                         if (body == NULL) {
1962                                 CERROR("Missing/short ost_body\n");
1963                                 RETURN(-EFAULT);
1964                         }
1965
1966                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1967                             !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
1968                                 req->rq_ops = &ost_hpreq_punch;
1969                 }
1970         }
1971         RETURN(0);
1972 }
1973
1974 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
1975 int ost_handle(struct ptlrpc_request *req)
1976 {
1977         struct obd_trans_info trans_info = { 0, };
1978         struct obd_trans_info *oti = &trans_info;
1979         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1980         struct obd_device *obd = NULL;
1981         ENTRY;
1982
1983         LASSERT(current->journal_info == NULL);
1984
1985         /* primordial rpcs don't affect server recovery */
1986         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1987         case SEC_CTX_INIT:
1988         case SEC_CTX_INIT_CONT:
1989         case SEC_CTX_FINI:
1990                 GOTO(out, rc = 0);
1991         }
1992
1993         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1994
1995         /* XXX identical to MDS */
1996         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1997                 int recovering;
1998
1999                 if (req->rq_export == NULL) {
2000                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2001                                lustre_msg_get_opc(req->rq_reqmsg),
2002                                libcfs_id2str(req->rq_peer));
2003                         req->rq_status = -ENOTCONN;
2004                         GOTO(out, rc = -ENOTCONN);
2005                 }
2006
2007                 obd = req->rq_export->exp_obd;
2008
2009                 /* Check for aborted recovery. */
2010                 spin_lock_bh(&obd->obd_processing_task_lock);
2011                 recovering = obd->obd_recovering;
2012                 spin_unlock_bh(&obd->obd_processing_task_lock);
2013                 if (recovering) {
2014                         rc = ost_filter_recovery_request(req, obd,
2015                                                          &should_process);
2016                         if (rc || !should_process)
2017                                 RETURN(rc);
2018                         else if (should_process < 0) {
2019                                 req->rq_status = should_process;
2020                                 rc = ptlrpc_error(req);
2021                                 RETURN(rc);
2022                         }
2023                 }
2024         }
2025
2026         oti_init(oti, req);
2027
2028         rc = ost_msg_check_version(req->rq_reqmsg);
2029         if (rc)
2030                 RETURN(rc);
2031
2032         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2033         case OST_CONNECT: {
2034                 CDEBUG(D_INODE, "connect\n");
2035                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2036                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2037                         RETURN(0);
2038                 rc = target_handle_connect(req);
2039                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2040                         RETURN(0);
2041                 if (!rc) {
2042                         rc = ost_init_sec_level(req);
2043                         if (!rc)
2044                                 rc = ost_connect_check_sptlrpc(req);
2045                 }
2046                 break;
2047         }
2048         case OST_DISCONNECT:
2049                 CDEBUG(D_INODE, "disconnect\n");
2050                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2051                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2052                         RETURN(0);
2053                 rc = target_handle_disconnect(req);
2054                 break;
2055         case OST_CREATE:
2056                 CDEBUG(D_INODE, "create\n");
2057                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2058                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2059                         RETURN(0);
2060                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2061                         GOTO(out, rc = -ENOSPC);
2062                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2063                         GOTO(out, rc = -EROFS);
2064                 rc = ost_create(req->rq_export, req, oti);
2065                 break;
2066         case OST_DESTROY:
2067                 CDEBUG(D_INODE, "destroy\n");
2068                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2069                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2070                         RETURN(0);
2071                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2072                         GOTO(out, rc = -EROFS);
2073                 rc = ost_destroy(req->rq_export, req, oti);
2074                 break;
2075         case OST_GETATTR:
2076                 CDEBUG(D_INODE, "getattr\n");
2077                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2078                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2079                         RETURN(0);
2080                 rc = ost_getattr(req->rq_export, req);
2081                 break;
2082         case OST_SETATTR:
2083                 CDEBUG(D_INODE, "setattr\n");
2084                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2085                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2086                         RETURN(0);
2087                 rc = ost_setattr(req->rq_export, req, oti);
2088                 break;
2089         case OST_WRITE:
2090                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
2091                 CDEBUG(D_INODE, "write\n");
2092                 /* req->rq_request_portal would be nice, if it was set */
2093                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2094                         CERROR("%s: deny write request from %s to portal %u\n",
2095                                req->rq_export->exp_obd->obd_name,
2096                                obd_export_nid2str(req->rq_export),
2097                                req->rq_rqbd->rqbd_service->srv_req_portal);
2098                         GOTO(out, rc = -EPROTO);
2099                 }
2100                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2101                         RETURN(0);
2102                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2103                         GOTO(out, rc = -ENOSPC);
2104                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2105                         GOTO(out, rc = -EROFS);
2106                 rc = ost_brw_write(req, oti);
2107                 LASSERT(current->journal_info == NULL);
2108                 /* ost_brw_write sends its own replies */
2109                 RETURN(rc);
2110         case OST_READ:
2111                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
2112                 CDEBUG(D_INODE, "read\n");
2113                 /* req->rq_request_portal would be nice, if it was set */
2114                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2115                         CERROR("%s: deny read request from %s to portal %u\n",
2116                                req->rq_export->exp_obd->obd_name,
2117                                obd_export_nid2str(req->rq_export),
2118                                req->rq_rqbd->rqbd_service->srv_req_portal);
2119                         GOTO(out, rc = -EPROTO);
2120                 }
2121                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2122                         RETURN(0);
2123                 rc = ost_brw_read(req, oti);
2124                 LASSERT(current->journal_info == NULL);
2125                 /* ost_brw_read sends its own replies */
2126                 RETURN(rc);
2127         case OST_PUNCH:
2128                 CDEBUG(D_INODE, "punch\n");
2129                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2130                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2131                         RETURN(0);
2132                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2133                         GOTO(out, rc = -EROFS);
2134                 rc = ost_punch(req->rq_export, req, oti);
2135                 break;
2136         case OST_STATFS:
2137                 CDEBUG(D_INODE, "statfs\n");
2138                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2139                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2140                         RETURN(0);
2141                 rc = ost_statfs(req);
2142                 break;
2143         case OST_SYNC:
2144                 CDEBUG(D_INODE, "sync\n");
2145                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2146                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2147                         RETURN(0);
2148                 rc = ost_sync(req->rq_export, req);
2149                 break;
2150         case OST_SET_INFO:
2151                 DEBUG_REQ(D_INODE, req, "set_info");
2152                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2153                 rc = ost_set_info(req->rq_export, req);
2154                 break;
2155         case OST_GET_INFO:
2156                 DEBUG_REQ(D_INODE, req, "get_info");
2157                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2158                 rc = ost_get_info(req->rq_export, req);
2159                 break;
2160 #ifdef HAVE_QUOTA_SUPPORT
2161         case OST_QUOTACHECK:
2162                 CDEBUG(D_INODE, "quotacheck\n");
2163                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2164                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2165                         RETURN(0);
2166                 rc = ost_handle_quotacheck(req);
2167                 break;
2168         case OST_QUOTACTL:
2169                 CDEBUG(D_INODE, "quotactl\n");
2170                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2171                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2172                         RETURN(0);
2173                 rc = ost_handle_quotactl(req);
2174                 break;
2175         case OST_QUOTA_ADJUST_QUNIT:
2176                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
2177                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
2178                 rc = ost_handle_quota_adjust_qunit(req);
2179                 break;
2180 #endif
2181         case OBD_PING:
2182                 DEBUG_REQ(D_INODE, req, "ping");
2183                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2184                 rc = target_handle_ping(req);
2185                 break;
2186         /* FIXME - just reply status */
2187         case LLOG_ORIGIN_CONNECT:
2188                 DEBUG_REQ(D_INODE, req, "log connect");
2189                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2190                 rc = ost_llog_handle_connect(req->rq_export, req);
2191                 req->rq_status = rc;
2192                 rc = req_capsule_server_pack(&req->rq_pill);
2193                 if (rc)
2194                         RETURN(rc);
2195                 RETURN(ptlrpc_reply(req));
2196         case OBD_LOG_CANCEL:
2197                 CDEBUG(D_INODE, "log cancel\n");
2198                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2199                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2200                         RETURN(0);
2201                 rc = llog_origin_handle_cancel(req);
2202                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2203                         RETURN(0);
2204                 req->rq_status = rc;
2205                 rc = req_capsule_server_pack(&req->rq_pill);
2206                 if (rc)
2207                         RETURN(rc);
2208                 RETURN(ptlrpc_reply(req));
2209         case LDLM_ENQUEUE:
2210                 CDEBUG(D_INODE, "enqueue\n");
2211                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2212                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
2213                         RETURN(0);
2214                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2215                                          ldlm_server_blocking_ast,
2216                                          ldlm_server_glimpse_ast);
2217                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2218                 break;
2219         case LDLM_CONVERT:
2220                 CDEBUG(D_INODE, "convert\n");
2221                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2222                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
2223                         RETURN(0);
2224                 rc = ldlm_handle_convert(req);
2225                 break;
2226         case LDLM_CANCEL:
2227                 CDEBUG(D_INODE, "cancel\n");
2228                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2229                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2230                         RETURN(0);
2231                 rc = ldlm_handle_cancel(req);
2232                 break;
2233         case LDLM_BL_CALLBACK:
2234         case LDLM_CP_CALLBACK:
2235                 CDEBUG(D_INODE, "callback\n");
2236                 CERROR("callbacks should not happen on OST\n");
2237                 /* fall through */
2238         default:
2239                 CERROR("Unexpected opcode %d\n",
2240                        lustre_msg_get_opc(req->rq_reqmsg));
2241                 req->rq_status = -ENOTSUPP;
2242                 rc = ptlrpc_error(req);
2243                 RETURN(rc);
2244         }
2245
2246         LASSERT(current->journal_info == NULL);
2247
2248         EXIT;
2249         /* If we're DISCONNECTing, the export_data is already freed */
2250         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2251                 target_committed_to_req(req);
2252
2253 out:
2254         if (!rc)
2255                 oti_to_request(oti, req);
2256
2257         target_send_reply(req, rc, fail);
2258         return 0;
2259 }
2260 EXPORT_SYMBOL(ost_handle);
2261 /*
2262  * free per-thread pool created by ost_thread_init().
2263  */
2264 static void ost_thread_done(struct ptlrpc_thread *thread)
2265 {
2266         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2267                                              * Storage */
2268
2269         ENTRY;
2270
2271         LASSERT(thread != NULL);
2272
2273         /*
2274          * be prepared to handle partially-initialized pools (because this is
2275          * called from ost_thread_init() for cleanup.
2276          */
2277         tls = thread->t_data;
2278         if (tls != NULL) {
2279                 OBD_FREE_PTR(tls);
2280                 thread->t_data = NULL;
2281         }
2282         EXIT;
2283 }
2284
2285 /*
2286  * initialize per-thread page pool (bug 5137).
2287  */
2288 static int ost_thread_init(struct ptlrpc_thread *thread)
2289 {
2290         struct ost_thread_local_cache *tls;
2291
2292         ENTRY;
2293
2294         LASSERT(thread != NULL);
2295         LASSERT(thread->t_data == NULL);
2296         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2297
2298         OBD_ALLOC_PTR(tls);
2299         if (tls == NULL)
2300                 RETURN(-ENOMEM);
2301         thread->t_data = tls;
2302         RETURN(0);
2303 }
2304
2305 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2306
2307 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2308 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2309 {
2310         struct ost_obd *ost = &obd->u.ost;
2311         struct lprocfs_static_vars lvars;
2312         int oss_min_threads;
2313         int oss_max_threads;
2314         int oss_min_create_threads;
2315         int oss_max_create_threads;
2316         int rc;
2317         ENTRY;
2318
2319         rc = cleanup_group_info();
2320         if (rc)
2321                 RETURN(rc);
2322
2323         lprocfs_ost_init_vars(&lvars);
2324         lprocfs_obd_setup(obd, lvars.obd_vars);
2325
2326         sema_init(&ost->ost_health_sem, 1);
2327
2328         if (oss_num_threads) {
2329                 /* If oss_num_threads is set, it is the min and the max. */
2330                 if (oss_num_threads > OSS_THREADS_MAX)
2331                         oss_num_threads = OSS_THREADS_MAX;
2332                 if (oss_num_threads < OSS_THREADS_MIN)
2333                         oss_num_threads = OSS_THREADS_MIN;
2334                 oss_max_threads = oss_min_threads = oss_num_threads;
2335         } else {
2336                 /* Base min threads on memory and cpus */
2337                 oss_min_threads = num_possible_cpus() * CFS_NUM_CACHEPAGES >>
2338                         (27 - CFS_PAGE_SHIFT);
2339                 if (oss_min_threads < OSS_THREADS_MIN)
2340                         oss_min_threads = OSS_THREADS_MIN;
2341                 /* Insure a 4x range for dynamic threads */
2342                 if (oss_min_threads > OSS_THREADS_MAX / 4)
2343                         oss_min_threads = OSS_THREADS_MAX / 4;
2344                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2345         }
2346
2347         ost->ost_service =
2348                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2349                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
2350                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2351                                 ost_handle, LUSTRE_OSS_NAME,
2352                                 obd->obd_proc_entry, target_print_req,
2353                                 oss_min_threads, oss_max_threads,
2354                                 "ll_ost", LCT_DT_THREAD, NULL);
2355         if (ost->ost_service == NULL) {
2356                 CERROR("failed to start service\n");
2357                 GOTO(out_lprocfs, rc = -ENOMEM);
2358         }
2359
2360         rc = ptlrpc_start_threads(obd, ost->ost_service);
2361         if (rc)
2362                 GOTO(out_service, rc = -EINVAL);
2363
2364         if (oss_num_create_threads) {
2365                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2366                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2367                 if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
2368                         oss_num_create_threads = OSS_MIN_CREATE_THREADS;
2369                 oss_min_create_threads = oss_max_create_threads =
2370                         oss_num_create_threads;
2371         } else {
2372                 oss_min_create_threads = OSS_MIN_CREATE_THREADS;
2373                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2374         }
2375
2376         ost->ost_create_service =
2377                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2378                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2379                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2380                                 ost_handle, "ost_create",
2381                                 obd->obd_proc_entry, target_print_req,
2382                                 oss_min_create_threads, oss_max_create_threads,
2383                                 "ll_ost_creat", LCT_DT_THREAD, NULL);
2384         if (ost->ost_create_service == NULL) {
2385                 CERROR("failed to start OST create service\n");
2386                 GOTO(out_service, rc = -ENOMEM);
2387         }
2388
2389         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
2390         if (rc)
2391                 GOTO(out_create, rc = -EINVAL);
2392
2393         ost->ost_io_service =
2394                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2395                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2396                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2397                                 ost_handle, "ost_io",
2398                                 obd->obd_proc_entry, target_print_req,
2399                                 oss_min_threads, oss_max_threads,
2400                                 "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
2401         if (ost->ost_io_service == NULL) {
2402                 CERROR("failed to start OST I/O service\n");
2403                 GOTO(out_create, rc = -ENOMEM);
2404         }
2405
2406         ost->ost_io_service->srv_init = ost_thread_init;
2407         ost->ost_io_service->srv_done = ost_thread_done;
2408         ost->ost_io_service->srv_cpu_affinity = 1;
2409         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
2410         if (rc)
2411                 GOTO(out_io, rc = -EINVAL);
2412
2413         ping_evictor_start();
2414
2415         RETURN(0);
2416
2417 out_io:
2418         ptlrpc_unregister_service(ost->ost_io_service);
2419         ost->ost_io_service = NULL;
2420 out_create:
2421         ptlrpc_unregister_service(ost->ost_create_service);
2422         ost->ost_create_service = NULL;
2423 out_service:
2424         ptlrpc_unregister_service(ost->ost_service);
2425         ost->ost_service = NULL;
2426 out_lprocfs:
2427         lprocfs_obd_cleanup(obd);
2428         RETURN(rc);
2429 }
2430
2431 static int ost_cleanup(struct obd_device *obd)
2432 {
2433         struct ost_obd *ost = &obd->u.ost;
2434         int err = 0;
2435         ENTRY;
2436
2437         ping_evictor_stop();
2438
2439         spin_lock_bh(&obd->obd_processing_task_lock);
2440         if (obd->obd_recovering) {
2441                 target_cancel_recovery_timer(obd);
2442                 obd->obd_recovering = 0;
2443         }
2444         spin_unlock_bh(&obd->obd_processing_task_lock);
2445
2446         down(&ost->ost_health_sem);
2447         ptlrpc_unregister_service(ost->ost_service);
2448         ptlrpc_unregister_service(ost->ost_create_service);
2449         ptlrpc_unregister_service(ost->ost_io_service);
2450         ost->ost_service = NULL;
2451         ost->ost_create_service = NULL;
2452         up(&ost->ost_health_sem);
2453
2454         lprocfs_obd_cleanup(obd);
2455
2456         RETURN(err);
2457 }
2458
2459 static int ost_health_check(struct obd_device *obd)
2460 {
2461         struct ost_obd *ost = &obd->u.ost;
2462         int rc = 0;
2463
2464         down(&ost->ost_health_sem);
2465         rc |= ptlrpc_service_health_check(ost->ost_service);
2466         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2467         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2468         up(&ost->ost_health_sem);
2469
2470         /*
2471          * health_check to return 0 on healthy
2472          * and 1 on unhealthy.
2473          */
2474         if( rc != 0)
2475                 rc = 1;
2476
2477         return rc;
2478 }
2479
2480 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2481 {
2482         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2483 }
2484
2485 /* use obd ops to offer management infrastructure */
2486 static struct obd_ops ost_obd_ops = {
2487         .o_owner        = THIS_MODULE,
2488         .o_setup        = ost_setup,
2489         .o_cleanup      = ost_cleanup,
2490         .o_health_check = ost_health_check,
2491 };
2492
2493
2494 static int __init ost_init(void)
2495 {
2496         struct lprocfs_static_vars lvars;
2497         int rc;
2498         ENTRY;
2499
2500         lprocfs_ost_init_vars(&lvars);
2501         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2502                                  LUSTRE_OSS_NAME, NULL);
2503
2504         if (ost_num_threads != 0 && oss_num_threads == 0) {
2505                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2506                               "use oss_num_threads instead or unset both for "
2507                               "dynamic thread startup\n");
2508                 oss_num_threads = ost_num_threads;
2509         }
2510
2511         RETURN(rc);
2512 }
2513
2514 static void /*__exit*/ ost_exit(void)
2515 {
2516         class_unregister_type(LUSTRE_OSS_NAME);
2517 }
2518
2519 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2520 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2521 MODULE_LICENSE("GPL");
2522
2523 module_init(ost_init);
2524 module_exit(ost_exit);