Whamcloud - gitweb
b=14109
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45 #define DEBUG_SUBSYSTEM S_OST
46
47 #include <linux/module.h>
48 #include <obd_ost.h>
49 #include <lustre_net.h>
50 #include <lustre_dlm.h>
51 #include <lustre_export.h>
52 #include <lustre_debug.h>
53 #include <linux/init.h>
54 #include <lprocfs_status.h>
55 #include <libcfs/list.h>
56 #include <lustre_quota.h>
57 #include <lustre_log.h>
58 #include "ost_internal.h"
59
60 static int oss_num_threads;
61 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
62                 "number of OSS service threads to start");
63
64 static int ost_num_threads;
65 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
66                 "number of OST service threads to start (deprecated)");
67
68 static int oss_num_create_threads;
69 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
70                 "number of OSS create threads to start");
71
72 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
73 {
74         struct oti_req_ack_lock *ack_lock;
75         int i;
76
77         if (oti == NULL)
78                 return;
79
80         if (req->rq_repmsg) {
81                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
82                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
83                 versions[0] = oti->oti_pre_version;
84                 lustre_msg_set_versions(req->rq_repmsg, versions);
85         }
86         req->rq_transno = oti->oti_transno;
87
88         /* XXX 4 == entries in oti_ack_locks??? */
89         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
90                 if (!ack_lock->mode)
91                         break;
92                 /* XXX not even calling target_send_reply in some cases... */
93                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
94         }
95 }
96
97 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
98                        struct obd_trans_info *oti)
99 {
100         struct ost_body *body, *repbody;
101         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
102         int rc;
103         ENTRY;
104
105         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
106                                   lustre_swab_ost_body);
107         if (body == NULL)
108                 RETURN(-EFAULT);
109
110         if (body->oa.o_id == 0)
111                 RETURN(-EPROTO);
112
113         if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
114                 struct ldlm_request *dlm;
115                 dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
116                                          lustre_swab_ldlm_request);
117                 if (dlm == NULL)
118                         RETURN (-EFAULT);
119                 ldlm_request_cancel(req, dlm, 0);
120         }
121
122         rc = lustre_pack_reply(req, 2, size, NULL);
123         if (rc)
124                 RETURN(rc);
125
126         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
127                 oti->oti_logcookies = &body->oa.o_lcookie;
128         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
129                                  sizeof(*repbody));
130         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
131         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
132         RETURN(0);
133 }
134
135 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
136 {
137         struct ost_body *body, *repbody;
138         struct obd_info oinfo = { { { 0 } } };
139         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
140         int rc;
141         ENTRY;
142
143         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
144                                   lustre_swab_ost_body);
145         if (body == NULL)
146                 RETURN(-EFAULT);
147
148         rc = lustre_pack_reply(req, 2, size, NULL);
149         if (rc)
150                 RETURN(rc);
151
152         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
153                                  sizeof(*repbody));
154         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
155
156         oinfo.oi_oa = &repbody->oa;
157         req->rq_status = obd_getattr(exp, &oinfo);
158         RETURN(0);
159 }
160
161 static int ost_statfs(struct ptlrpc_request *req)
162 {
163         struct obd_statfs *osfs;
164         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
165         int rc;
166         ENTRY;
167
168         rc = lustre_pack_reply(req, 2, size, NULL);
169         if (rc)
170                 RETURN(rc);
171
172         osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
173
174         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
175                                     cfs_time_current_64() - HZ, 0);
176         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
177                 osfs->os_bfree = osfs->os_bavail = 64;
178         if (req->rq_status != 0)
179                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
180
181         RETURN(0);
182 }
183
184 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
185                       struct obd_trans_info *oti)
186 {
187         struct ost_body *body, *repbody;
188         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
189         int rc;
190         ENTRY;
191
192         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
193                                   lustre_swab_ost_body);
194         if (body == NULL)
195                 RETURN(-EFAULT);
196
197         rc = lustre_pack_reply(req, 2, size, NULL);
198         if (rc)
199                 RETURN(rc);
200
201         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
202                                  sizeof(*repbody));
203         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
204         oti->oti_logcookies = &repbody->oa.o_lcookie;
205         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
206         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
207         RETURN(0);
208 }
209
210 /*
211  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
212  * lock on the file being truncated.
213  */
214 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
215                               struct lustre_handle *lh)
216 {
217         int flags;
218         struct ldlm_res_id res_id = { .name = { oa->o_id } };
219         ldlm_policy_data_t policy;
220         __u64 start;
221         __u64 finis;
222
223         ENTRY;
224
225         LASSERT(!lustre_handle_is_used(lh));
226
227         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
228             !(oa->o_flags & OBD_FL_TRUNCLOCK))
229                 RETURN(0);
230
231         CDEBUG(D_INODE, "OST-side truncate lock.\n");
232
233         start = oa->o_size;
234         finis = start + oa->o_blocks;
235
236         /*
237          * standard truncate optimization: if file body is completely
238          * destroyed, don't send data back to the server.
239          */
240         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
241
242         policy.l_extent.start = start & CFS_PAGE_MASK;
243
244         /*
245          * If ->o_blocks is EOF it means "lock till the end of the
246          * file". Otherwise, it's size of a hole being punched (in bytes)
247          */
248         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
249                 policy.l_extent.end = OBD_OBJECT_EOF;
250         else
251                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
252
253         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
254                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
255                                       ldlm_blocking_ast, ldlm_completion_ast,
256                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
257 }
258
259 /*
260  * Helper function for ost_punch(): release lock acquired by
261  * ost_punch_lock_get(), if any.
262  */
263 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
264                                struct lustre_handle *lh)
265 {
266         ENTRY;
267         if (lustre_handle_is_used(lh))
268                 ldlm_lock_decref(lh, LCK_PW);
269         EXIT;
270 }
271
272 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
273                      struct obd_trans_info *oti)
274 {
275         struct obd_info oinfo = { { { 0 } } };
276         struct ost_body *body, *repbody;
277         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
278         int rc;
279         struct lustre_handle lh = {0,};
280         ENTRY;
281
282         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
283         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
284
285         /* ost_body is varified and swabbed in ost_hpreq_handler() */
286         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
287         LASSERT(body != NULL);
288
289         oinfo.oi_oa = &body->oa;
290         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
291         oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
292
293         if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
294             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
295                 RETURN(-EINVAL);
296
297         rc = lustre_pack_reply(req, 2, size, NULL);
298         if (rc)
299                 RETURN(rc);
300
301         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
302                                  sizeof(*repbody));
303         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
304         if (rc == 0) {
305                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
306                     oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
307                         /*
308                          * If OBD_FL_TRUNCLOCK is the only bit set in
309                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
310                          * through filter_setattr() to filter_iocontrol().
311                          */
312                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
313
314                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
315                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
316         }
317         repbody->oa = *oinfo.oi_oa;
318         RETURN(rc);
319 }
320
321 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
322 {
323         struct obd_info oinfo = { { { 0 } } };
324         struct ost_body *body, *repbody;
325         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
326         int rc;
327         ENTRY;
328
329         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
330                                   lustre_swab_ost_body);
331         if (body == NULL)
332                 RETURN(-EFAULT);
333
334         rc = lustre_pack_reply(req, 2, size, NULL);
335         if (rc)
336                 RETURN(rc);
337
338         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
339                                  sizeof(*repbody));
340
341         oinfo.oi_oa = &body->oa;
342         req->rq_status = obd_sync(exp, &oinfo, repbody->oa.o_size,
343                                   repbody->oa.o_blocks, NULL);
344         repbody->oa = *oinfo.oi_oa;
345         RETURN(0);
346 }
347
348 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
349                        struct obd_trans_info *oti)
350 {
351         struct ost_body *body, *repbody;
352         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
353         int rc;
354         struct obd_info oinfo = { { { 0 } } };
355         ENTRY;
356
357         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
358                                   lustre_swab_ost_body);
359         if (body == NULL)
360                 RETURN(-EFAULT);
361
362         rc = lustre_pack_reply(req, 2, size, NULL);
363         if (rc)
364                 RETURN(rc);
365
366         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
367                                  sizeof(*repbody));
368         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
369
370         oinfo.oi_oa = &repbody->oa;
371         req->rq_status = obd_setattr(exp, &oinfo, oti);
372         RETURN(0);
373 }
374
375 static int ost_bulk_timeout(void *data)
376 {
377         ENTRY;
378         /* We don't fail the connection here, because having the export
379          * killed makes the (vital) call to commitrw very sad.
380          */
381         RETURN(1);
382 }
383
384 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
385                                cksum_type_t cksum_type)
386 {
387         __u32 cksum;
388         int i;
389
390         cksum = init_checksum(cksum_type);
391         for (i = 0; i < desc->bd_iov_count; i++) {
392                 struct page *page = desc->bd_iov[i].kiov_page;
393                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
394                 char *ptr = kmap(page) + off;
395                 int len = desc->bd_iov[i].kiov_len;
396
397                 /* corrupt the data before we compute the checksum, to
398                  * simulate a client->OST data error */
399                 if (i == 0 && opc == OST_WRITE &&
400                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE))
401                         memcpy(ptr, "bad3", min(4, len));
402                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
403                 /* corrupt the data after we compute the checksum, to
404                  * simulate an OST->client data error */
405                 if (i == 0 && opc == OST_READ &&
406                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND)) {
407                         memcpy(ptr, "bad4", min(4, len));
408                         /* nobody should use corrupted page again */
409                         ClearPageUptodate(page);
410                 }
411                 kunmap(page);
412         }
413
414         return cksum;
415 }
416
417 static int ost_brw_lock_get(int mode, struct obd_export *exp,
418                             struct obd_ioobj *obj, struct niobuf_remote *nb,
419                             struct lustre_handle *lh)
420 {
421         int flags                 = 0;
422         int nrbufs                = obj->ioo_bufcnt;
423         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
424         ldlm_policy_data_t policy;
425         int i;
426
427         ENTRY;
428
429         LASSERT(mode == LCK_PR || mode == LCK_PW);
430         LASSERT(!lustre_handle_is_used(lh));
431
432         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
433                 RETURN(0);
434
435         /* EXPENSIVE ASSERTION */
436         for (i = 1; i < nrbufs; i ++)
437                 LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
438                         (nb[i].flags & OBD_BRW_SRVLOCK));
439
440         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
441         policy.l_extent.end   = (nb[nrbufs - 1].offset +
442                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
443
444         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
445                                       LDLM_EXTENT, &policy, mode, &flags,
446                                       ldlm_blocking_ast, ldlm_completion_ast,
447                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
448 }
449
450 static void ost_brw_lock_put(int mode,
451                              struct obd_ioobj *obj, struct niobuf_remote *niob,
452                              struct lustre_handle *lh)
453 {
454         ENTRY;
455         LASSERT(mode == LCK_PR || mode == LCK_PW);
456         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
457                 lustre_handle_is_used(lh));
458         if (lustre_handle_is_used(lh))
459                 ldlm_lock_decref(lh, mode);
460         EXIT;
461 }
462
463 struct ost_prolong_data {
464         struct obd_export *opd_exp;
465         ldlm_policy_data_t opd_policy;
466         struct obdo *opd_oa;
467         ldlm_mode_t opd_mode;
468         int opd_lock_match;
469         int opd_timeout;
470 };
471
472 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
473 {
474         struct ost_prolong_data *opd = data;
475
476         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
477
478         if (lock->l_req_mode != lock->l_granted_mode) {
479                 /* scan granted locks only */
480                 return LDLM_ITER_STOP;
481         }
482
483         if (lock->l_export != opd->opd_exp) {
484                 /* prolong locks only for given client */
485                 return LDLM_ITER_CONTINUE;
486         }
487
488         if (!(lock->l_granted_mode & opd->opd_mode)) {
489                 /* we aren't interesting in all type of locks */
490                 return LDLM_ITER_CONTINUE;
491         }
492
493         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
494             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
495                 /* the request doesn't cross the lock, skip it */
496                 return LDLM_ITER_CONTINUE;
497         }
498
499         /* Fill the obdo with the matched lock handle.
500          * XXX: it is possible in some cases the IO RPC is covered by several
501          * locks, even for the write case, so it may need to be a lock list. */
502         if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
503                 opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
504                 opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
505         }
506
507         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
508                 /* ignore locks not being cancelled */
509                 return LDLM_ITER_CONTINUE;
510         }
511
512         CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
513                lock->l_resource->lr_name.name[0],
514                lock->l_resource->lr_name.name[1],
515                opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
516         /* OK. this is a possible lock the user holds doing I/O
517          * let's refresh eviction timer for it */
518         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
519         opd->opd_lock_match = 1;
520
521         return LDLM_ITER_CONTINUE;
522 }
523
524 static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
525                                 struct niobuf_remote *nb, struct obdo *oa,
526                                 ldlm_mode_t mode)
527
528
529 {
530         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
531         struct ost_prolong_data opd = { 0 };
532         int nrbufs = obj->ioo_bufcnt;
533
534         ENTRY;
535
536         opd.opd_mode = mode;
537         opd.opd_exp = req->rq_export;
538         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
539         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
540                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
541
542         /* prolong locks for the current service time of the corresponding
543          * portal (= OST_IO_PORTAL) */
544         opd.opd_timeout = AT_OFF ? obd_timeout / 2 :
545                           max(at_est2timeout(at_get(&req->rq_rqbd->
546                               rqbd_service->srv_at_estimate)), ldlm_timeout);
547
548         CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
549                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
550                opd.opd_policy.l_extent.end);
551
552         if (oa->o_valid & OBD_MD_FLHANDLE) {
553                 struct ldlm_lock *lock;
554
555                 lock = ldlm_handle2lock(&oa->o_handle);
556                 if (lock != NULL) {
557                         ost_prolong_locks_iter(lock, &opd);
558                         if (opd.opd_lock_match) {
559                                 LDLM_LOCK_PUT(lock);
560                                 RETURN(1);
561                         }
562
563                         /* Check if the lock covers the whole IO region,
564                          * otherwise iterate through the resource. */
565                         if (lock->l_policy_data.l_extent.end >=
566                             opd.opd_policy.l_extent.end &&
567                             lock->l_policy_data.l_extent.start <=
568                             opd.opd_policy.l_extent.start) {
569                                 LDLM_LOCK_PUT(lock);
570                                 RETURN(0);
571                         }
572                         LDLM_LOCK_PUT(lock);
573                 }
574         }
575
576         opd.opd_oa = oa;
577         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
578                               ost_prolong_locks_iter, &opd);
579         RETURN(opd.opd_lock_match);
580 }
581
582 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
583 {
584         struct ptlrpc_bulk_desc *desc = NULL;
585         struct obd_export       *exp = req->rq_export;
586         struct niobuf_remote *remote_nb;
587         struct niobuf_local *local_nb;
588         struct obd_ioobj *ioo;
589         struct ost_body *body, *repbody;
590         struct l_wait_info lwi;
591         struct lustre_handle lockh = { 0 };
592         __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
593         int niocount, npages, nob = 0, rc, i;
594         int no_reply = 0;
595         ENTRY;
596
597         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
598                 GOTO(out, rc = -EIO);
599
600         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
601
602         /* Check if there is eviction in progress, and if so, wait for it to
603          * finish */
604         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
605                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
606                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
607                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
608                         &lwi);
609         }
610         if (exp->exp_failed)
611                 GOTO(out, rc = -ENOTCONN);
612
613         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
614          * ost_rw_hpreq_check(). */
615         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
616         LASSERT(body != NULL);
617
618         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
619         LASSERT(ioo != NULL);
620
621         niocount = ioo->ioo_bufcnt;
622         remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
623                                    niocount * sizeof(*remote_nb));
624         LASSERT(remote_nb != NULL);
625
626         rc = lustre_pack_reply(req, 2, size, NULL);
627         if (rc)
628                 GOTO(out, rc);
629
630         /*
631          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
632          * ost_thread_init().
633          */
634         local_nb = ost_tls(req)->local;
635
636         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
637         if (rc != 0)
638                 GOTO(out_bulk, rc);
639
640         /*
641          * If getting the lock took more time than
642          * client was willing to wait, drop it. b=11330
643          */
644         if (cfs_time_current_sec() > req->rq_deadline ||
645             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
646                 no_reply = 1;
647                 CERROR("Dropping timed-out read from %s because locking"
648                        "object "LPX64" took %ld seconds (limit was %ld).\n",
649                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
650                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
651                        req->rq_deadline - req->rq_arrival_time.tv_sec);
652                 GOTO(out_lock, rc = -ETIMEDOUT);
653         }
654
655         npages = OST_THREAD_POOL_SIZE;
656         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
657                         remote_nb, &npages, local_nb, oti);
658         if (rc != 0)
659                 GOTO(out_lock, rc);
660
661         desc = ptlrpc_prep_bulk_exp(req, npages,
662                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
663         if (desc == NULL) /* XXX: check all cleanup stuff */
664                 GOTO(out, rc = -ENOMEM);
665
666         ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
667
668         nob = 0;
669         for (i = 0; i < npages; i++) {
670                 int page_rc = local_nb[i].rc;
671
672                 if (page_rc < 0) {              /* error */
673                         rc = page_rc;
674                         break;
675                 }
676
677                 nob += page_rc;
678                 if (page_rc != 0) {             /* some data! */
679                         LASSERT (local_nb[i].page != NULL);
680                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
681                                               local_nb[i].offset & ~CFS_PAGE_MASK,
682                                               page_rc);
683                 }
684
685                 if (page_rc != local_nb[i].len) { /* short read */
686                         /* All subsequent pages should be 0 */
687                         while(++i < npages)
688                                 LASSERT(local_nb[i].rc == 0);
689                         break;
690                 }
691         }
692
693         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
694                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
695
696                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
697                         cksum_type = cksum_type_unpack(body->oa.o_flags);
698                 body->oa.o_flags = cksum_type_pack(cksum_type);
699                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
700                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
701                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
702         } else {
703                 body->oa.o_valid = 0;
704         }
705         /* We're finishing using body->oa as an input variable */
706
707         /* Check if client was evicted while we were doing i/o before touching
708            network */
709         if (rc == 0) {
710                 /* Check if there is eviction in progress, and if so, wait for
711                  * it to finish */
712                 if (unlikely(atomic_read(&exp->exp_obd->
713                                                 obd_evict_inprogress))) {
714                         lwi = LWI_INTR(NULL, NULL);
715                         rc = l_wait_event(exp->exp_obd->
716                                                 obd_evict_inprogress_waitq,
717                                           !atomic_read(&exp->exp_obd->
718                                                         obd_evict_inprogress),
719                                           &lwi);
720                 }
721                 if (exp->exp_failed)
722                         rc = -ENOTCONN;
723                 else
724                         rc = ptlrpc_start_bulk_transfer(desc);
725                 if (rc == 0) {
726                         time_t start = cfs_time_current_sec();
727                         do {
728                                 long timeoutl = req->rq_deadline -
729                                         cfs_time_current_sec();
730                                 cfs_duration_t timeout = (timeoutl <= 0 || rc) ?
731                                         CFS_TICK : cfs_time_seconds(timeoutl);
732                                 lwi = LWI_TIMEOUT_INTERVAL(timeout,
733                                                            cfs_time_seconds(1),
734                                                            ost_bulk_timeout,
735                                                            desc);
736                                 rc = l_wait_event(desc->bd_waitq,
737                                                   !ptlrpc_server_bulk_active(desc) ||
738                                                   exp->exp_failed, &lwi);
739                                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
740                                 /* Wait again if we changed deadline */
741                         } while ((rc == -ETIMEDOUT) &&
742                                  (req->rq_deadline > cfs_time_current_sec()));
743
744                         if (rc == -ETIMEDOUT) {
745                                 DEBUG_REQ(D_ERROR, req,
746                                           "timeout on bulk PUT after %ld%+lds",
747                                           req->rq_deadline - start,
748                                           cfs_time_current_sec() -
749                                           req->rq_deadline);
750                                 ptlrpc_abort_bulk(desc);
751                         } else if (exp->exp_failed) {
752                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
753                                 rc = -ENOTCONN;
754                                 ptlrpc_abort_bulk(desc);
755                         } else if (!desc->bd_success ||
756                                    desc->bd_nob_transferred != desc->bd_nob) {
757                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
758                                           desc->bd_success ?
759                                           "truncated" : "network error on",
760                                           desc->bd_nob_transferred,
761                                           desc->bd_nob);
762                                 /* XXX should this be a different errno? */
763                                 rc = -ETIMEDOUT;
764                         }
765                 } else {
766                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
767                 }
768                 no_reply = rc != 0;
769         }
770
771         /* Must commit after prep above in all cases */
772         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
773                           remote_nb, npages, local_nb, oti, rc);
774
775         if (rc == 0) {
776                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
777                                          sizeof(*repbody));
778                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
779         }
780
781  out_lock:
782         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
783  out_bulk:
784         if (desc)
785                 ptlrpc_free_bulk(desc);
786  out:
787         LASSERT(rc <= 0);
788         if (rc == 0) {
789                 req->rq_status = nob;
790                 ptlrpc_lprocfs_brw(req, nob);
791                 target_committed_to_req(req);
792                 ptlrpc_reply(req);
793         } else if (!no_reply) {
794                 /* Only reply if there was no comms problem with bulk */
795                 target_committed_to_req(req);
796                 req->rq_status = rc;
797                 ptlrpc_error(req);
798         } else {
799                 /* reply out callback would free */
800                 ptlrpc_req_drop_rs(req);
801                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
802                       "client will retry\n",
803                       exp->exp_obd->obd_name,
804                       exp->exp_client_uuid.uuid,
805                       exp->exp_connection->c_remote_uuid.uuid,
806                       libcfs_id2str(req->rq_peer));
807         }
808
809         RETURN(rc);
810 }
811
812 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
813 {
814         struct ptlrpc_bulk_desc *desc = NULL;
815         struct obd_export       *exp = req->rq_export;
816         struct niobuf_remote    *remote_nb;
817         struct niobuf_local     *local_nb;
818         struct obd_ioobj        *ioo;
819         struct ost_body         *body, *repbody;
820         struct l_wait_info       lwi;
821         struct lustre_handle     lockh = {0};
822         __u32                   *rcs;
823         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
824         int objcount, niocount, npages;
825         int rc, i, j;
826         obd_count                client_cksum = 0, server_cksum = 0;
827         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
828         int                      no_reply = 0;
829         ENTRY;
830
831         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
832                 GOTO(out, rc = -EIO);
833         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
834                 GOTO(out, rc = -EFAULT);
835
836         /* pause before transaction has been started */
837         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
838
839         /* Check if there is eviction in progress, and if so, wait for it to
840          * finish */
841         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
842                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
843                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
844                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
845                         &lwi);
846         }
847         if (exp->exp_failed)
848                 GOTO(out, rc = -ENOTCONN);
849
850         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
851          * ost_rw_hpreq_check(). */
852         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
853         LASSERT(body != NULL);
854
855         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
856                    sizeof(*ioo);
857         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
858                              objcount * sizeof(*ioo));
859         LASSERT(ioo != NULL);
860         for (niocount = i = 0; i < objcount; i++)
861                 niocount += ioo[i].ioo_bufcnt;
862
863         remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
864                                    niocount * sizeof(*remote_nb));
865         LASSERT(remote_nb != NULL);
866
867         size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
868         rc = lustre_pack_reply(req, 3, size, NULL);
869         if (rc != 0)
870                 GOTO(out, rc);
871
872         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
873         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
874                              niocount * sizeof(*rcs));
875
876         /*
877          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
878          * ost_thread_init().
879          */
880         local_nb = ost_tls(req)->local;
881
882         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
883         if (rc != 0)
884                 GOTO(out_bulk, rc);
885
886         /*
887          * If getting the lock took more time than
888          * client was willing to wait, drop it. b=11330
889          */
890         if (cfs_time_current_sec() > req->rq_deadline ||
891             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
892                 no_reply = 1;
893                 CERROR("Dropping timed-out write from %s because locking "
894                        "object "LPX64" took %ld seconds (limit was %ld).\n",
895                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
896                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
897                        req->rq_deadline - req->rq_arrival_time.tv_sec);
898                 GOTO(out_lock, rc = -ETIMEDOUT);
899         }
900
901         ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
902
903         /* obd_preprw clobbers oa->valid, so save what we need */
904         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
905                 client_cksum = body->oa.o_cksum;
906                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
907                         cksum_type = cksum_type_unpack(body->oa.o_flags);
908         }
909
910         /* Because we already sync grant info with client when reconnect,
911          * grant info will be cleared for resent req, then fed_grant and
912          * total_grant will not be modified in following preprw_write*/
913         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
914                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
915                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
916         }
917
918         npages = OST_THREAD_POOL_SIZE;
919         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
920                         ioo, remote_nb, &npages, local_nb, oti);
921         if (rc != 0)
922                 GOTO(out_lock, rc);
923
924         desc = ptlrpc_prep_bulk_exp(req, npages,
925                                      BULK_GET_SINK, OST_BULK_PORTAL);
926         if (desc == NULL)
927                 GOTO(out, rc = -ENOMEM);
928
929         /* NB Having prepped, we must commit... */
930
931         for (i = 0; i < npages; i++)
932                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
933                                       local_nb[i].offset & ~CFS_PAGE_MASK,
934                                       local_nb[i].len);
935
936         /* Check if client was evicted while we were doing i/o before touching
937            network */
938         if (desc->bd_export->exp_failed)
939                 rc = -ENOTCONN;
940         else
941                 rc = ptlrpc_start_bulk_transfer(desc);
942         if (rc == 0) {
943                 time_t start = cfs_time_current_sec();
944                 do {
945                         long timeoutl = req->rq_deadline -
946                                 cfs_time_current_sec();
947                         cfs_duration_t timeout = (timeoutl <= 0 || rc) ?
948                                 CFS_TICK : cfs_time_seconds(timeoutl);
949                         lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
950                                                    ost_bulk_timeout, desc);
951                         rc = l_wait_event(desc->bd_waitq,
952                                           !ptlrpc_server_bulk_active(desc) ||
953                                           desc->bd_export->exp_failed, &lwi);
954                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
955                         /* Wait again if we changed deadline */
956                 } while ((rc == -ETIMEDOUT) &&
957                          (req->rq_deadline > cfs_time_current_sec()));
958
959                 if (rc == -ETIMEDOUT) {
960                         DEBUG_REQ(D_ERROR, req,
961                                   "timeout on bulk GET after %ld%+lds",
962                                   req->rq_deadline - start,
963                                   cfs_time_current_sec() -
964                                   req->rq_deadline);
965                         ptlrpc_abort_bulk(desc);
966                 } else if (desc->bd_export->exp_failed) {
967                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
968                         rc = -ENOTCONN;
969                         ptlrpc_abort_bulk(desc);
970                 } else if (!desc->bd_success ||
971                            desc->bd_nob_transferred != desc->bd_nob) {
972                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
973                                   desc->bd_success ?
974                                   "truncated" : "network error on",
975                                   desc->bd_nob_transferred, desc->bd_nob);
976                         /* XXX should this be a different errno? */
977                         rc = -ETIMEDOUT;
978                 }
979         } else {
980                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
981         }
982         no_reply = rc != 0;
983
984         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
985                                  sizeof(*repbody));
986         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
987
988         if (client_cksum != 0 && rc == 0) {
989                 static int cksum_counter;
990
991                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
992                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
993                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
994                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
995                 repbody->oa.o_cksum = server_cksum;
996                 cksum_counter++;
997                 if (unlikely(client_cksum != server_cksum)) {
998                         CERROR("client csum %x, server csum %x\n",
999                                client_cksum, server_cksum);
1000                         cksum_counter = 0;
1001                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1002                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1003                                cksum_counter, libcfs_id2str(req->rq_peer),
1004                                server_cksum);
1005                 }
1006         }
1007
1008         /* Check if there is eviction in progress, and if so, wait for
1009          * it to finish */
1010         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
1011                 lwi = LWI_INTR(NULL, NULL);
1012                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
1013                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
1014                         &lwi);
1015         }
1016         if (rc == 0 && exp->exp_failed)
1017                 rc = -ENOTCONN;
1018
1019         /* Must commit after prep above in all cases */
1020         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1021                           remote_nb, npages, local_nb, oti, rc);
1022
1023         if (unlikely(client_cksum != server_cksum && rc == 0)) {
1024                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1025                 char *msg;
1026                 char *via;
1027                 char *router;
1028
1029                 if (new_cksum == server_cksum)
1030                         msg = "changed in transit before arrival at OST";
1031                 else if (new_cksum == client_cksum)
1032                         msg = "initial checksum before message complete";
1033                 else
1034                         msg = "changed in transit AND after initial checksum";
1035
1036                 if (req->rq_peer.nid == desc->bd_sender) {
1037                         via = router = "";
1038                 } else {
1039                         via = " via ";
1040                         router = libcfs_nid2str(desc->bd_sender);
1041                 }
1042
1043                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from %s"
1044                                    "%s%s inum "LPU64"/"LPU64" object "LPU64"/"
1045                                    LPU64" extent ["LPU64"-"LPU64"]\n",
1046                                    exp->exp_obd->obd_name, msg,
1047                                    libcfs_id2str(req->rq_peer),
1048                                    via, router,
1049                                    body->oa.o_valid & OBD_MD_FLFID ?
1050                                                 body->oa.o_fid : (__u64)0,
1051                                    body->oa.o_valid & OBD_MD_FLFID ?
1052                                                 body->oa.o_generation :(__u64)0,
1053                                    body->oa.o_id,
1054                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1055                                                 body->oa.o_gr : (__u64)0,
1056                                    local_nb[0].offset,
1057                                    local_nb[npages-1].offset +
1058                                    local_nb[npages-1].len - 1 );
1059                 CERROR("client csum %x, original server csum %x, "
1060                        "server csum now %x\n",
1061                        client_cksum, server_cksum, new_cksum);
1062         }
1063
1064         if (rc == 0) {
1065                 int nob = 0;
1066
1067                 /* set per-requested niobuf return codes */
1068                 for (i = j = 0; i < niocount; i++) {
1069                         int len = remote_nb[i].len;
1070
1071                         nob += len;
1072                         rcs[i] = 0;
1073                         do {
1074                                 LASSERT(j < npages);
1075                                 if (local_nb[j].rc < 0)
1076                                         rcs[i] = local_nb[j].rc;
1077                                 len -= local_nb[j].len;
1078                                 j++;
1079                         } while (len > 0);
1080                         LASSERT(len == 0);
1081                 }
1082                 LASSERT(j == npages);
1083                 ptlrpc_lprocfs_brw(req, nob);
1084         }
1085
1086  out_lock:
1087         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1088  out_bulk:
1089         if (desc)
1090                 ptlrpc_free_bulk(desc);
1091  out:
1092         if (rc == 0) {
1093                 oti_to_request(oti, req);
1094                 target_committed_to_req(req);
1095                 rc = ptlrpc_reply(req);
1096         } else if (!no_reply) {
1097                 /* Only reply if there was no comms problem with bulk */
1098                 target_committed_to_req(req);
1099                 req->rq_status = rc;
1100                 ptlrpc_error(req);
1101         } else {
1102                 /* reply out callback would free */
1103                 ptlrpc_req_drop_rs(req);
1104                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1105                       "client will retry\n",
1106                       exp->exp_obd->obd_name,
1107                       exp->exp_client_uuid.uuid,
1108                       exp->exp_connection->c_remote_uuid.uuid,
1109                       libcfs_id2str(req->rq_peer));
1110         }
1111         RETURN(rc);
1112 }
1113
1114 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1115 {
1116         struct ost_body *body = NULL, *repbody;
1117         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1118         char *key, *val = NULL;
1119         int keylen, vallen, rc = 0;
1120         ENTRY;
1121
1122         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1123         if (key == NULL) {
1124                 DEBUG_REQ(D_HA, req, "no set_info key");
1125                 RETURN(-EFAULT);
1126         }
1127         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1128
1129         if (KEY_IS(KEY_GRANT_SHRINK)) {
1130                 rc = lustre_pack_reply(req, 2, size, NULL);
1131                 if (rc)
1132                         RETURN(rc);
1133         } else {
1134                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1135                 if (rc)
1136                         RETURN(rc);
1137         }
1138
1139         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1140         if (vallen) {
1141                 if (KEY_IS(KEY_GRANT_SHRINK)) {
1142                         body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
1143                                                   sizeof(*body),
1144                                                   lustre_swab_ost_body);
1145                         if (!body)
1146                                 RETURN(-EFAULT);
1147
1148                         repbody = lustre_msg_buf(req->rq_repmsg,
1149                                                  REPLY_REC_OFF,
1150                                                  sizeof(*repbody));
1151                         memcpy(repbody, body, sizeof(*body));
1152                         val = (char*)repbody;
1153                 } else
1154                         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0);
1155         }
1156
1157         if (KEY_IS(KEY_EVICT_BY_NID)) {
1158                 if (val && vallen)
1159                         obd_export_evict_by_nid(exp->exp_obd, val);
1160
1161                 GOTO(out, rc = 0);
1162         }
1163
1164         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1165 out:
1166         lustre_msg_set_status(req->rq_repmsg, 0);
1167         RETURN(rc);
1168 }
1169
1170 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1171 {
1172         void *key, *reply;
1173         int keylen, rc = 0;
1174         int size[2] = { sizeof(struct ptlrpc_body), 0 };
1175         ENTRY;
1176
1177         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1178         if (key == NULL) {
1179                 DEBUG_REQ(D_HA, req, "no get_info key");
1180                 RETURN(-EFAULT);
1181         }
1182         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1183
1184         /* call once to get the size to allocate the reply buffer */
1185         rc = obd_get_info(exp, keylen, key, &size[1], NULL, NULL);
1186         if (rc)
1187                 RETURN(rc);
1188
1189         rc = lustre_pack_reply(req, 2, size, NULL);
1190         if (rc)
1191                 RETURN(rc);
1192
1193         reply = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*reply));
1194         /* call again to fill in the reply buffer */
1195         rc = obd_get_info(exp, keylen, key, size, reply, NULL);
1196         lustre_msg_set_status(req->rq_repmsg, 0);
1197
1198         RETURN(rc);
1199 }
1200
1201 #ifdef HAVE_QUOTA_SUPPORT
1202 static int ost_handle_quotactl(struct ptlrpc_request *req)
1203 {
1204         struct obd_quotactl *oqctl, *repoqc;
1205         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1206         int rc;
1207         ENTRY;
1208
1209         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1210                                    lustre_swab_obd_quotactl);
1211         if (oqctl == NULL)
1212                 GOTO(out, rc = -EPROTO);
1213
1214         rc = lustre_pack_reply(req, 2, size, NULL);
1215         if (rc)
1216                 GOTO(out, rc);
1217
1218         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1219
1220         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1221         *repoqc = *oqctl;
1222 out:
1223         RETURN(rc);
1224 }
1225
1226 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1227 {
1228         struct obd_quotactl *oqctl;
1229         int rc;
1230         ENTRY;
1231
1232         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1233                                    lustre_swab_obd_quotactl);
1234         if (oqctl == NULL)
1235                 RETURN(-EPROTO);
1236
1237         rc = lustre_pack_reply(req, 1, NULL, NULL);
1238         if (rc)
1239                 RETURN(rc);
1240
1241         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1242         RETURN(0);
1243 }
1244
1245 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1246 {
1247         struct quota_adjust_qunit *oqaq, *repoqa;
1248         struct lustre_quota_ctxt *qctxt;
1249         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqa) };
1250         int rc;
1251         ENTRY;
1252
1253         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1254         oqaq = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqaq),
1255                                   lustre_swab_quota_adjust_qunit);
1256
1257         if (oqaq == NULL)
1258                 GOTO(out, rc = -EPROTO);
1259         rc = lustre_pack_reply(req, 2, size, NULL);
1260         if (rc)
1261                 GOTO(out, rc);
1262         repoqa = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqa));
1263         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
1264         *repoqa = *oqaq;
1265  out:
1266         RETURN(rc);
1267 }
1268 #endif
1269
1270 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1271  * (if requested) */
1272 int ost_blocking_ast(struct ldlm_lock *lock,
1273                              struct ldlm_lock_desc *desc,
1274                              void *data, int flag)
1275 {
1276         struct obd_device *obd = lock->l_export->exp_obd;
1277         if (flag == LDLM_CB_CANCELING &&
1278             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1279             (obd->u.ost.ost_sync_on_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1280              (obd->u.ost.ost_sync_on_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1281               lock->l_flags & LDLM_FL_CBPENDING))) {
1282                 struct obd_info *oinfo;
1283                 int rc;
1284
1285                 OBD_ALLOC_PTR(oinfo);
1286                 if (!oinfo)
1287                         RETURN(-ENOMEM);
1288
1289                 OBDO_ALLOC(oinfo->oi_oa);
1290                 if (!oinfo->oi_oa) {
1291                         OBD_FREE_PTR(oinfo);
1292                         RETURN(-ENOMEM);
1293                 }
1294
1295                 oinfo->oi_oa->o_id = lock->l_resource->lr_name.name[0];
1296                 oinfo->oi_oa->o_valid = OBD_MD_FLID;
1297
1298                 rc = obd_sync_rqset(lock->l_export, oinfo,
1299                                     lock->l_policy_data.l_extent.start,
1300                                     lock->l_policy_data.l_extent.end);
1301                 if (rc)
1302                         CERROR("Error %d syncing data on lock cancel\n", rc);
1303
1304                 OBDO_FREE(oinfo->oi_oa);
1305                 OBD_FREE_PTR(oinfo);
1306         }
1307
1308         return ldlm_server_blocking_ast(lock, desc, data, flag);
1309 }
1310
1311 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1312                                        struct obd_device *obd, int *process)
1313 {
1314         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1315         case OST_CONNECT: /* This will never get here, but for completeness. */
1316         case OST_DISCONNECT:
1317                *process = 1;
1318                RETURN(0);
1319
1320         case OBD_PING:
1321         case OST_CREATE:
1322         case OST_DESTROY:
1323         case OST_PUNCH:
1324         case OST_SETATTR:
1325         case OST_SYNC:
1326         case OST_WRITE:
1327         case OBD_LOG_CANCEL:
1328         case LDLM_ENQUEUE:
1329                 *process = target_queue_recovery_request(req, obd);
1330                 RETURN(0);
1331
1332         default:
1333                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1334                 *process = 0;
1335                 /* XXX what should we set rq_status to here? */
1336                 req->rq_status = -EAGAIN;
1337                 RETURN(ptlrpc_error(req));
1338         }
1339 }
1340
1341 int ost_msg_check_version(struct lustre_msg *msg)
1342 {
1343         int rc;
1344
1345         switch(lustre_msg_get_opc(msg)) {
1346         case OST_CONNECT:
1347         case OST_DISCONNECT:
1348         case OBD_PING:
1349                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1350                 if (rc)
1351                         CERROR("bad opc %u version %08x, expecting %08x\n",
1352                                lustre_msg_get_opc(msg),
1353                                lustre_msg_get_version(msg),
1354                                LUSTRE_OBD_VERSION);
1355                 break;
1356         case OST_CREATE:
1357         case OST_DESTROY:
1358         case OST_GETATTR:
1359         case OST_SETATTR:
1360         case OST_WRITE:
1361         case OST_READ:
1362         case OST_PUNCH:
1363         case OST_STATFS:
1364         case OST_SYNC:
1365         case OST_SET_INFO:
1366         case OST_GET_INFO:
1367 #ifdef HAVE_QUOTA_SUPPORT
1368         case OST_QUOTACHECK:
1369         case OST_QUOTACTL:
1370         case OST_QUOTA_ADJUST_QUNIT:
1371 #endif
1372                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1373                 if (rc)
1374                         CERROR("bad opc %u version %08x, expecting %08x\n",
1375                                lustre_msg_get_opc(msg),
1376                                lustre_msg_get_version(msg),
1377                                LUSTRE_OST_VERSION);
1378                 break;
1379         case LDLM_ENQUEUE:
1380         case LDLM_CONVERT:
1381         case LDLM_CANCEL:
1382         case LDLM_BL_CALLBACK:
1383         case LDLM_CP_CALLBACK:
1384                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1385                 if (rc)
1386                         CERROR("bad opc %u version %08x, expecting %08x\n",
1387                                lustre_msg_get_opc(msg),
1388                                lustre_msg_get_version(msg),
1389                                LUSTRE_DLM_VERSION);
1390                 break;
1391         case LLOG_ORIGIN_CONNECT:
1392         case OBD_LOG_CANCEL:
1393                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1394                 if (rc)
1395                         CERROR("bad opc %u version %08x, expecting %08x\n",
1396                                lustre_msg_get_opc(msg),
1397                                lustre_msg_get_version(msg),
1398                                LUSTRE_LOG_VERSION);
1399                 break;
1400         default:
1401                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1402                 rc = -ENOTSUPP;
1403         }
1404         return rc;
1405 }
1406
1407 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1408                                    struct ldlm_lock *lock)
1409 {
1410         struct niobuf_remote *nb;
1411         struct obd_ioobj *ioo;
1412         struct ost_body *body;
1413         int objcount, niocount;
1414         int mode, opc, i;
1415         __u64 start, end;
1416         ENTRY;
1417
1418         opc = lustre_msg_get_opc(req->rq_reqmsg);
1419         LASSERT(opc == OST_READ || opc == OST_WRITE);
1420
1421         /* As the request may be covered by several locks, do not look at
1422          * o_handle, look at the RPC IO region. */
1423         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
1424                                   lustre_swab_obdo);
1425         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
1426                    sizeof(*ioo);
1427         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
1428                              objcount * sizeof(*ioo));
1429         LASSERT(ioo != NULL);
1430         for (niocount = i = 0; i < objcount; i++)
1431                 niocount += ioo[i].ioo_bufcnt;
1432
1433         nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1434                             niocount * sizeof(*nb));
1435         LASSERT(nb != NULL);
1436
1437         mode = LCK_PW;
1438         if (opc == OST_READ)
1439                 mode |= LCK_PR;
1440
1441         start = nb[0].offset & CFS_PAGE_MASK;
1442         end = (nb[ioo->ioo_bufcnt - 1].offset +
1443                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
1444
1445         LASSERT(lock->l_resource != NULL);
1446         if (lock->l_resource->lr_name.name[0] != ioo->ioo_id)
1447                 RETURN(0);
1448
1449         if (!(lock->l_granted_mode & mode))
1450                 RETURN(0);
1451
1452         if (lock->l_policy_data.l_extent.end < start ||
1453             lock->l_policy_data.l_extent.start > end)
1454                 RETURN(0);
1455
1456         RETURN(1);
1457 }
1458
1459 /**
1460  * Swab buffers needed to call ost_rw_prolong_locks() and call it.
1461  * Return the value from ost_rw_prolong_locks() which is non-zero if
1462  * there is a cancelled lock which is waiting for this IO request.
1463  */
1464 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1465 {
1466         struct niobuf_remote *nb;
1467         struct obd_ioobj *ioo;
1468         struct ost_body *body;
1469         int objcount, niocount;
1470         int mode, opc, i;
1471         ENTRY;
1472
1473         opc = lustre_msg_get_opc(req->rq_reqmsg);
1474         LASSERT(opc == OST_READ || opc == OST_WRITE);
1475
1476         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1477         LASSERT(body != NULL);
1478
1479         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
1480                    sizeof(*ioo);
1481         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
1482                              objcount * sizeof(*ioo));
1483         LASSERT(ioo != NULL);
1484
1485         for (niocount = i = 0; i < objcount; i++)
1486                 niocount += ioo[i].ioo_bufcnt;
1487         nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1488                             niocount * sizeof(*nb));
1489         LASSERT(nb != NULL);
1490         LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
1491
1492         mode = LCK_PW;
1493         if (opc == OST_READ)
1494                 mode |= LCK_PR;
1495         RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
1496 }
1497
1498 static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
1499 {
1500         struct ldlm_res_id res_id = { .name = { oa->o_id } };
1501         struct ost_prolong_data opd = { 0 };
1502         __u64 start, end;
1503         ENTRY;
1504
1505         start = oa->o_size;
1506         end = start + oa->o_blocks;
1507
1508         opd.opd_mode = LCK_PW;
1509         opd.opd_exp = req->rq_export;
1510         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
1511         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
1512                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
1513         else
1514                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
1515
1516         /* prolong locks for the current service time of the corresponding
1517          * portal (= OST_IO_PORTAL) */
1518         opd.opd_timeout = AT_OFF ? obd_timeout / 2 :
1519                           max(at_est2timeout(at_get(&req->rq_rqbd->
1520                               rqbd_service->srv_at_estimate)), ldlm_timeout);
1521
1522         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1523                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
1524                opd.opd_policy.l_extent.end);
1525
1526         opd.opd_oa = oa;
1527
1528         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
1529                               ost_prolong_locks_iter, &opd);
1530         RETURN(opd.opd_lock_match);
1531 }
1532
1533 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1534                                       struct ldlm_lock *lock)
1535 {
1536         struct ost_body *body;
1537         ENTRY;
1538
1539         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
1540                                   lustre_swab_obdo);
1541         LASSERT(body != NULL);
1542
1543         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1544             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1545                 RETURN(1);
1546         RETURN(0);
1547 }
1548
1549 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1550 {
1551         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
1552                                                REQ_REC_OFF, sizeof(*body));
1553         LASSERT(body != NULL);
1554         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1555                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
1556
1557         RETURN(ost_punch_prolong_locks(req, &body->oa));
1558 }
1559
1560 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1561         .hpreq_lock_match  = ost_rw_hpreq_lock_match,
1562         .hpreq_check       = ost_rw_hpreq_check,
1563 };
1564
1565 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1566         .hpreq_lock_match  = ost_punch_hpreq_lock_match,
1567         .hpreq_check       = ost_punch_hpreq_check,
1568 };
1569
1570 /** Assign high priority operations to the request if needed. */
1571 static int ost_hpreq_handler(struct ptlrpc_request *req)
1572 {
1573         ENTRY;
1574         if (req->rq_export) {
1575                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1576                 struct ost_body *body;
1577
1578                 if (opc == OST_READ || opc == OST_WRITE) {
1579                         struct niobuf_remote *nb;
1580                         struct obd_ioobj *ioo;
1581                         int objcount, niocount;
1582                         int swab, i;
1583
1584                         body = lustre_swab_reqbuf(req, REQ_REC_OFF,
1585                                                   sizeof(*body),
1586                                                   lustre_swab_obdo);
1587                         if (!body) {
1588                                 CERROR("Missing/short ost_body\n");
1589                                 RETURN(-EFAULT);
1590                         }
1591                         objcount = lustre_msg_buflen(req->rq_reqmsg,
1592                                                      REQ_REC_OFF + 1) /
1593                                 sizeof(*ioo);
1594                         if (objcount == 0) {
1595                                 CERROR("Missing/short ioobj\n");
1596                                 RETURN(-EFAULT);
1597                         }
1598                         if (objcount > 1) {
1599                                 CERROR("too many ioobjs (%d)\n", objcount);
1600                                 RETURN(-EFAULT);
1601                         }
1602
1603                         swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
1604                                 lustre_req_need_swab(req);
1605                         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
1606                                                  objcount * sizeof(*ioo),
1607                                                  lustre_swab_obd_ioobj);
1608                         if (!ioo) {
1609                                 CERROR("Missing/short ioobj\n");
1610                                 RETURN(-EFAULT);
1611                         }
1612                         for (niocount = i = 0; i < objcount; i++) {
1613                                 if (i > 0 && swab)
1614                                         lustre_swab_obd_ioobj(&ioo[i]);
1615                                 if (ioo[i].ioo_bufcnt == 0) {
1616                                         CERROR("ioo[%d] has zero bufcnt\n", i);
1617                                         RETURN(-EFAULT);
1618                                 }
1619                                 niocount += ioo[i].ioo_bufcnt;
1620                         }
1621                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
1622                                 DEBUG_REQ(D_ERROR, req, "bulk has too many "
1623                                           "pages (%d)", niocount);
1624                                 RETURN(-EFAULT);
1625                         }
1626
1627                         swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
1628                                 lustre_req_need_swab(req);
1629                         nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
1630                                                 niocount * sizeof(*nb),
1631                                                 lustre_swab_niobuf_remote);
1632                         if (!nb) {
1633                                 CERROR("Missing/short niobuf\n");
1634                                 RETURN(-EFAULT);
1635                         }
1636
1637                         if (swab) {
1638                                 /* swab remaining niobufs */
1639                                 for (i = 1; i < niocount; i++)
1640                                         lustre_swab_niobuf_remote(&nb[i]);
1641                         }
1642
1643                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
1644                                 req->rq_ops = &ost_hpreq_rw;
1645                 } else if (opc == OST_PUNCH) {
1646                         body = lustre_swab_reqbuf(req, REQ_REC_OFF,
1647                                                   sizeof(*body),
1648                                                   lustre_swab_obdo);
1649                         if (!body) {
1650                                 CERROR("Missing/short ost_body\n");
1651                                 RETURN(-EFAULT);
1652                         }
1653
1654                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1655                             !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
1656                                 req->rq_ops = &ost_hpreq_punch;
1657                 }
1658         }
1659         RETURN(0);
1660 }
1661
1662 static int ost_handle(struct ptlrpc_request *req)
1663 {
1664         struct obd_trans_info trans_info = { 0, };
1665         struct obd_trans_info *oti = &trans_info;
1666         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1667         struct obd_device *obd = NULL;
1668         ENTRY;
1669
1670         LASSERT(current->journal_info == NULL);
1671         /* XXX identical to MDS */
1672         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1673                 int recovering;
1674
1675                 if (req->rq_export == NULL) {
1676                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1677                                lustre_msg_get_opc(req->rq_reqmsg),
1678                                libcfs_id2str(req->rq_peer));
1679                         req->rq_status = -ENOTCONN;
1680                         GOTO(out, rc = -ENOTCONN);
1681                 }
1682
1683                 obd = req->rq_export->exp_obd;
1684
1685                 /* Check for aborted recovery. */
1686                 spin_lock_bh(&obd->obd_processing_task_lock);
1687                 recovering = obd->obd_recovering;
1688                 spin_unlock_bh(&obd->obd_processing_task_lock);
1689                 if (recovering &&
1690                     target_recovery_check_and_stop(obd) == 0) {
1691                         rc = ost_filter_recovery_request(req, obd,
1692                                                          &should_process);
1693                         if (rc || !should_process)
1694                                 RETURN(rc);
1695                 }
1696         }
1697
1698         oti_init(oti, req);
1699         rc = ost_msg_check_version(req->rq_reqmsg);
1700         if (rc)
1701                 RETURN(rc);
1702
1703         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1704         case OST_CONNECT: {
1705                 CDEBUG(D_INODE, "connect\n");
1706                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
1707                 rc = target_handle_connect(req, ost_handle);
1708                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET2, 0);
1709                 if (!rc)
1710                         obd = req->rq_export->exp_obd;
1711                 break;
1712         }
1713         case OST_DISCONNECT:
1714                 CDEBUG(D_INODE, "disconnect\n");
1715                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
1716                 rc = target_handle_disconnect(req);
1717                 break;
1718         case OST_CREATE:
1719                 CDEBUG(D_INODE, "create\n");
1720                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
1721                 OBD_FAIL_TIMEOUT_MS(OBD_FAIL_OST_PAUSE_CREATE, obd_fail_val);
1722                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1723                         GOTO(out, rc = -ENOSPC);
1724                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1725                         GOTO(out, rc = -EROFS);
1726                 rc = ost_create(req->rq_export, req, oti);
1727                 break;
1728         case OST_DESTROY:
1729                 CDEBUG(D_INODE, "destroy\n");
1730                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
1731                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1732                         GOTO(out, rc = -EROFS);
1733                 rc = ost_destroy(req->rq_export, req, oti);
1734                 break;
1735         case OST_GETATTR:
1736                 CDEBUG(D_INODE, "getattr\n");
1737                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
1738                 rc = ost_getattr(req->rq_export, req);
1739                 break;
1740         case OST_SETATTR:
1741                 CDEBUG(D_INODE, "setattr\n");
1742                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
1743                 rc = ost_setattr(req->rq_export, req, oti);
1744                 break;
1745         case OST_WRITE:
1746                 CDEBUG(D_INODE, "write\n");
1747                 /* req->rq_request_portal would be nice, if it was set */
1748                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1749                         CERROR("%s: deny write request from %s to portal %u\n",
1750                                req->rq_export->exp_obd->obd_name,
1751                                obd_export_nid2str(req->rq_export),
1752                                req->rq_rqbd->rqbd_service->srv_req_portal);
1753                         GOTO(out, rc = -EPROTO);
1754                 }
1755                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1756                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1757                         GOTO(out, rc = -ENOSPC);
1758                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1759                         GOTO(out, rc = -EROFS);
1760                 rc = ost_brw_write(req, oti);
1761                 LASSERT(current->journal_info == NULL);
1762                 /* ost_brw_write sends its own replies */
1763                 RETURN(rc);
1764         case OST_READ:
1765                 CDEBUG(D_INODE, "read\n");
1766                 /* req->rq_request_portal would be nice, if it was set */
1767                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1768                         CERROR("%s: deny read request from %s to portal %u\n",
1769                                req->rq_export->exp_obd->obd_name,
1770                                obd_export_nid2str(req->rq_export),
1771                                req->rq_rqbd->rqbd_service->srv_req_portal);
1772                         GOTO(out, rc = -EPROTO);
1773                 }
1774                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1775                 rc = ost_brw_read(req, oti);
1776                 LASSERT(current->journal_info == NULL);
1777                 /* ost_brw_read sends its own replies */
1778                 RETURN(rc);
1779         case OST_PUNCH:
1780                 CDEBUG(D_INODE, "punch\n");
1781                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1782                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1783                         GOTO(out, rc = -EROFS);
1784                 rc = ost_punch(req->rq_export, req, oti);
1785                 break;
1786         case OST_STATFS:
1787                 CDEBUG(D_INODE, "statfs\n");
1788                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1789                 rc = ost_statfs(req);
1790                 break;
1791         case OST_SYNC:
1792                 CDEBUG(D_INODE, "sync\n");
1793                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1794                 rc = ost_sync(req->rq_export, req);
1795                 break;
1796         case OST_SET_INFO:
1797                 DEBUG_REQ(D_INODE, req, "set_info");
1798                 rc = ost_set_info(req->rq_export, req);
1799                 break;
1800         case OST_GET_INFO:
1801                 DEBUG_REQ(D_INODE, req, "get_info");
1802                 rc = ost_get_info(req->rq_export, req);
1803                 break;
1804 #ifdef HAVE_QUOTA_SUPPORT
1805         case OST_QUOTACHECK:
1806                 CDEBUG(D_INODE, "quotacheck\n");
1807                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACHECK_NET, 0);
1808                 rc = ost_handle_quotacheck(req);
1809                 break;
1810         case OST_QUOTACTL:
1811                 CDEBUG(D_INODE, "quotactl\n");
1812                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACTL_NET, 0);
1813                 rc = ost_handle_quotactl(req);
1814                 break;
1815         case OST_QUOTA_ADJUST_QUNIT:
1816                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
1817                 rc = ost_handle_quota_adjust_qunit(req);
1818                 break;
1819 #endif
1820         case OBD_PING:
1821                 DEBUG_REQ(D_INODE, req, "ping");
1822                 rc = target_handle_ping(req);
1823                 break;
1824         /* FIXME - just reply status */
1825         case LLOG_ORIGIN_CONNECT:
1826                 DEBUG_REQ(D_INODE, req, "log connect");
1827                 rc = llog_handle_connect(req);
1828                 req->rq_status = rc;
1829                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1830                 if (rc)
1831                         RETURN(rc);
1832                 RETURN(ptlrpc_reply(req));
1833         case OBD_LOG_CANCEL:
1834                 CDEBUG(D_INODE, "log cancel\n");
1835                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1836                 rc = llog_origin_handle_cancel(req);
1837                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1838                 req->rq_status = rc;
1839                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1840                 if (rc)
1841                         RETURN(rc);
1842                 RETURN(ptlrpc_reply(req));
1843         case LDLM_ENQUEUE:
1844                 CDEBUG(D_INODE, "enqueue\n");
1845                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1846                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1847                                          ost_blocking_ast,
1848                                          ldlm_server_glimpse_ast);
1849                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1850                 break;
1851         case LDLM_CONVERT:
1852                 CDEBUG(D_INODE, "convert\n");
1853                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1854                 rc = ldlm_handle_convert(req);
1855                 break;
1856         case LDLM_CANCEL:
1857                 CDEBUG(D_INODE, "cancel\n");
1858                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1859                 rc = ldlm_handle_cancel(req);
1860                 break;
1861         case LDLM_BL_CALLBACK:
1862         case LDLM_CP_CALLBACK:
1863                 CDEBUG(D_INODE, "callback\n");
1864                 CERROR("callbacks should not happen on OST\n");
1865                 /* fall through */
1866         default:
1867                 CERROR("Unexpected opcode %d\n",
1868                        lustre_msg_get_opc(req->rq_reqmsg));
1869                 req->rq_status = -ENOTSUPP;
1870                 rc = ptlrpc_error(req);
1871                 RETURN(rc);
1872         }
1873
1874         LASSERT(current->journal_info == NULL);
1875
1876         EXIT;
1877         /* If we're DISCONNECTing, the export_data is already freed */
1878         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
1879                 target_committed_to_req(req);
1880
1881 out:
1882         if (!rc)
1883                 oti_to_request(oti, req);
1884         return target_handle_reply(req, rc, fail);
1885 }
1886
1887 /*
1888  * free per-thread pool created by ost_thread_init().
1889  */
1890 static void ost_thread_done(struct ptlrpc_thread *thread)
1891 {
1892         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
1893                                              * Storage */
1894
1895         ENTRY;
1896
1897         LASSERT(thread != NULL);
1898
1899         /*
1900          * be prepared to handle partially-initialized pools (because this is
1901          * called from ost_thread_init() for cleanup.
1902          */
1903         tls = thread->t_data;
1904         if (tls != NULL) {
1905                 OBD_FREE_PTR(tls);
1906                 thread->t_data = NULL;
1907         }
1908         EXIT;
1909 }
1910
1911 /*
1912  * initialize per-thread page pool (bug 5137).
1913  */
1914 static int ost_thread_init(struct ptlrpc_thread *thread)
1915 {
1916         struct ost_thread_local_cache *tls;
1917
1918         ENTRY;
1919
1920         LASSERT(thread != NULL);
1921         LASSERT(thread->t_data == NULL);
1922         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
1923
1924         OBD_ALLOC_PTR(tls);
1925         if (tls == NULL)
1926                 RETURN(-ENOMEM);
1927         thread->t_data = tls;
1928         RETURN(0);
1929 }
1930
1931 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
1932 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1933 {
1934         struct ost_obd *ost = &obd->u.ost;
1935         struct lprocfs_static_vars lvars;
1936         int oss_min_threads;
1937         int oss_max_threads;
1938         int oss_min_create_threads;
1939         int oss_max_create_threads;
1940         int rc;
1941         ENTRY;
1942
1943         rc = cleanup_group_info();
1944         if (rc)
1945                 RETURN(rc);
1946         lprocfs_ost_init_vars(&lvars);
1947         lprocfs_obd_setup(obd, lvars.obd_vars);
1948
1949         sema_init(&ost->ost_health_sem, 1);
1950
1951         /* Always sync on lock cancel */
1952         ost->ost_sync_on_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
1953
1954         if (oss_num_threads) {
1955                 /* If oss_num_threads is set, it is the min and the max. */
1956                 if (oss_num_threads > OSS_THREADS_MAX)
1957                         oss_num_threads = OSS_THREADS_MAX;
1958                 if (oss_num_threads < OSS_THREADS_MIN)
1959                         oss_num_threads = OSS_THREADS_MIN;
1960                 oss_max_threads = oss_min_threads = oss_num_threads;
1961         } else {
1962                 /* Base min threads on memory and cpus */
1963                 oss_min_threads = num_possible_cpus() * num_physpages >>
1964                         (27 - CFS_PAGE_SHIFT);
1965                 if (oss_min_threads < OSS_THREADS_MIN)
1966                         oss_min_threads = OSS_THREADS_MIN;
1967                 /* Insure a 4x range for dynamic threads */
1968                 if (oss_min_threads > OSS_THREADS_MAX / 4)
1969                         oss_min_threads = OSS_THREADS_MAX / 4;
1970                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
1971         }
1972
1973         ost->ost_service =
1974                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1975                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
1976                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
1977                                 ost_handle, LUSTRE_OSS_NAME,
1978                                 obd->obd_proc_entry, target_print_req,
1979                                 oss_min_threads, oss_max_threads, "ll_ost",
1980                                 NULL);
1981         if (ost->ost_service == NULL) {
1982                 CERROR("failed to start OST service\n");
1983                 GOTO(out_lprocfs, rc = -ENOMEM);
1984         }
1985
1986         rc = ptlrpc_start_threads(obd, ost->ost_service);
1987         if (rc)
1988                 GOTO(out_service, rc = -EINVAL);
1989
1990         if (oss_num_create_threads) {
1991                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
1992                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
1993                 if (oss_num_create_threads < OSS_DEF_CREATE_THREADS)
1994                         oss_num_create_threads = OSS_DEF_CREATE_THREADS;
1995                 oss_min_create_threads = oss_max_create_threads =
1996                         oss_num_create_threads;
1997         } else {
1998                 oss_min_create_threads = OSS_DEF_CREATE_THREADS;
1999                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2000         }
2001
2002         ost->ost_create_service =
2003                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2004                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2005                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2006                                 ost_handle, "ost_create",
2007                                 obd->obd_proc_entry, target_print_req,
2008                                 oss_min_create_threads,
2009                                 oss_max_create_threads,
2010                                 "ll_ost_creat", NULL);
2011         if (ost->ost_create_service == NULL) {
2012                 CERROR("failed to start OST create service\n");
2013                 GOTO(out_service, rc = -ENOMEM);
2014         }
2015
2016         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
2017         if (rc)
2018                 GOTO(out_create, rc = -EINVAL);
2019
2020         ost->ost_io_service =
2021                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2022                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2023                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2024                                 ost_handle, "ost_io",
2025                                 obd->obd_proc_entry, target_print_req,
2026                                 oss_min_threads, oss_max_threads, "ll_ost_io",
2027                                 ost_hpreq_handler);
2028         if (ost->ost_io_service == NULL) {
2029                 CERROR("failed to start OST I/O service\n");
2030                 GOTO(out_create, rc = -ENOMEM);
2031         }
2032
2033         ost->ost_io_service->srv_init = ost_thread_init;
2034         ost->ost_io_service->srv_done = ost_thread_done;
2035         ost->ost_io_service->srv_cpu_affinity = 1;
2036         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
2037         if (rc)
2038                 GOTO(out_io, rc = -EINVAL);
2039
2040         ping_evictor_start();
2041
2042         RETURN(0);
2043
2044 out_io:
2045         ptlrpc_unregister_service(ost->ost_io_service);
2046         ost->ost_io_service = NULL;
2047 out_create:
2048         ptlrpc_unregister_service(ost->ost_create_service);
2049         ost->ost_create_service = NULL;
2050 out_service:
2051         ptlrpc_unregister_service(ost->ost_service);
2052         ost->ost_service = NULL;
2053 out_lprocfs:
2054         lprocfs_obd_cleanup(obd);
2055         RETURN(rc);
2056 }
2057
2058 static int ost_cleanup(struct obd_device *obd)
2059 {
2060         struct ost_obd *ost = &obd->u.ost;
2061         int err = 0;
2062         ENTRY;
2063
2064         ping_evictor_stop();
2065
2066         spin_lock_bh(&obd->obd_processing_task_lock);
2067         if (obd->obd_recovering) {
2068                 target_cancel_recovery_timer(obd);
2069                 obd->obd_recovering = 0;
2070         }
2071         spin_unlock_bh(&obd->obd_processing_task_lock);
2072
2073         down(&ost->ost_health_sem);
2074         ptlrpc_unregister_service(ost->ost_service);
2075         ptlrpc_unregister_service(ost->ost_create_service);
2076         ptlrpc_unregister_service(ost->ost_io_service);
2077         ost->ost_service = NULL;
2078         ost->ost_create_service = NULL;
2079         up(&ost->ost_health_sem);
2080
2081         lprocfs_obd_cleanup(obd);
2082
2083         RETURN(err);
2084 }
2085
2086 static int ost_health_check(struct obd_device *obd)
2087 {
2088         struct ost_obd *ost = &obd->u.ost;
2089         int rc = 0;
2090
2091         down(&ost->ost_health_sem);
2092         rc |= ptlrpc_service_health_check(ost->ost_service);
2093         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2094         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2095         up(&ost->ost_health_sem);
2096
2097         /*
2098          * health_check to return 0 on healthy
2099          * and 1 on unhealthy.
2100          */
2101         if( rc != 0)
2102                 rc = 1;
2103
2104         return rc;
2105 }
2106
2107 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2108 {
2109         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2110 }
2111
2112 /* use obd ops to offer management infrastructure */
2113 static struct obd_ops ost_obd_ops = {
2114         .o_owner        = THIS_MODULE,
2115         .o_setup        = ost_setup,
2116         .o_cleanup      = ost_cleanup,
2117         .o_health_check = ost_health_check,
2118 };
2119
2120
2121 static int __init ost_init(void)
2122 {
2123         struct lprocfs_static_vars lvars;
2124         int rc;
2125         ENTRY;
2126
2127         lprocfs_ost_init_vars(&lvars);
2128         rc = class_register_type(&ost_obd_ops, lvars.module_vars,
2129                                  LUSTRE_OSS_NAME);
2130
2131         if (ost_num_threads != 0 && oss_num_threads == 0) {
2132                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2133                               "use oss_num_threads instead or unset both for "
2134                               "dynamic thread startup\n");
2135                 oss_num_threads = ost_num_threads;
2136         }
2137
2138         RETURN(rc);
2139 }
2140
2141 static void /*__exit*/ ost_exit(void)
2142 {
2143         class_unregister_type(LUSTRE_OSS_NAME);
2144 }
2145
2146 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2147 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2148 MODULE_LICENSE("GPL");
2149
2150 module_init(ost_init);
2151 module_exit(ost_exit);