Whamcloud - gitweb
b=20201
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45 #define DEBUG_SUBSYSTEM S_OST
46
47 #include <linux/module.h>
48 #include <obd_ost.h>
49 #include <lustre_net.h>
50 #include <lustre_dlm.h>
51 #include <lustre_export.h>
52 #include <lustre_debug.h>
53 #include <linux/init.h>
54 #include <lprocfs_status.h>
55 #include <libcfs/list.h>
56 #include <lustre_quota.h>
57 #include <lustre_log.h>
58 #include "ost_internal.h"
59
60 static int oss_num_threads;
61 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
62                 "number of OSS service threads to start");
63
64 static int ost_num_threads;
65 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
66                 "number of OST service threads to start (deprecated)");
67
68 static int oss_num_create_threads;
69 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
70                 "number of OSS create threads to start");
71
72 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
73 {
74         struct oti_req_ack_lock *ack_lock;
75         int i;
76
77         if (oti == NULL)
78                 return;
79
80         if (req->rq_repmsg) {
81                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
82                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
83                 versions[0] = oti->oti_pre_version;
84                 lustre_msg_set_versions(req->rq_repmsg, versions);
85         }
86         req->rq_transno = oti->oti_transno;
87
88         /* XXX 4 == entries in oti_ack_locks??? */
89         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
90                 if (!ack_lock->mode)
91                         break;
92                 /* XXX not even calling target_send_reply in some cases... */
93                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
94         }
95 }
96
97 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
98                        struct obd_trans_info *oti)
99 {
100         struct ost_body *body, *repbody;
101         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
102         int rc;
103         ENTRY;
104
105         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
106                                   lustre_swab_ost_body);
107         if (body == NULL)
108                 RETURN(-EFAULT);
109
110         if (body->oa.o_id == 0)
111                 RETURN(-EPROTO);
112
113         if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
114                 struct ldlm_request *dlm;
115                 dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
116                                          lustre_swab_ldlm_request);
117                 if (dlm == NULL)
118                         RETURN (-EFAULT);
119                 ldlm_request_cancel(req, dlm, 0);
120         }
121
122         rc = lustre_pack_reply(req, 2, size, NULL);
123         if (rc)
124                 RETURN(rc);
125
126         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
127                 oti->oti_logcookies = &body->oa.o_lcookie;
128         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
129                                  sizeof(*repbody));
130         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
131         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
132         RETURN(0);
133 }
134
135 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
136 {
137         struct ost_body *body, *repbody;
138         struct obd_info *oinfo;
139         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
140         int rc;
141         ENTRY;
142
143         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
144                                   lustre_swab_ost_body);
145         if (body == NULL)
146                 RETURN(-EFAULT);
147
148         rc = lustre_pack_reply(req, 2, size, NULL);
149         if (rc)
150                 RETURN(rc);
151
152         OBD_ALLOC_PTR(oinfo);
153         if (NULL == oinfo)
154                 RETURN(-ENOMEM);
155
156         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
157                                  sizeof(*repbody));
158         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159
160         oinfo->oi_oa = &repbody->oa;
161         req->rq_status = obd_getattr(exp, oinfo);
162
163         OBD_FREE_PTR(oinfo);
164         RETURN(0);
165 }
166
167 static int ost_statfs(struct ptlrpc_request *req)
168 {
169         struct obd_statfs *osfs;
170         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
171         int rc;
172         ENTRY;
173
174         rc = lustre_pack_reply(req, 2, size, NULL);
175         if (rc)
176                 RETURN(rc);
177
178         osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
179
180         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
181                                     cfs_time_current_64() - HZ, 0);
182         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
183                 osfs->os_bfree = osfs->os_bavail = 64;
184         if (req->rq_status != 0)
185                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
186
187         RETURN(0);
188 }
189
190 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
191                       struct obd_trans_info *oti)
192 {
193         struct ost_body *body, *repbody;
194         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
195         int rc;
196         ENTRY;
197
198         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
199                                   lustre_swab_ost_body);
200         if (body == NULL)
201                 RETURN(-EFAULT);
202
203         rc = lustre_pack_reply(req, 2, size, NULL);
204         if (rc)
205                 RETURN(rc);
206
207         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
208                                  sizeof(*repbody));
209         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
210         oti->oti_logcookies = &repbody->oa.o_lcookie;
211
212         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
213         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
214         RETURN(0);
215 }
216
217 /*
218  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
219  * lock on the file being truncated.
220  */
221 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
222                               struct lustre_handle *lh)
223 {
224         int flags;
225         struct ldlm_res_id res_id = { .name = { oa->o_id } };
226         ldlm_policy_data_t policy;
227         __u64 start;
228         __u64 finis;
229
230         ENTRY;
231
232         LASSERT(!lustre_handle_is_used(lh));
233
234         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
235             !(oa->o_flags & OBD_FL_TRUNCLOCK))
236                 RETURN(0);
237
238         CDEBUG(D_INODE, "OST-side truncate lock.\n");
239
240         start = oa->o_size;
241         finis = start + oa->o_blocks;
242
243         /*
244          * standard truncate optimization: if file body is completely
245          * destroyed, don't send data back to the server.
246          */
247         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
248
249         policy.l_extent.start = start & CFS_PAGE_MASK;
250
251         /*
252          * If ->o_blocks is EOF it means "lock till the end of the
253          * file". Otherwise, it's size of a hole being punched (in bytes)
254          */
255         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
256                 policy.l_extent.end = OBD_OBJECT_EOF;
257         else
258                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
259
260         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
261                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
262                                       ldlm_blocking_ast, ldlm_completion_ast,
263                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
264 }
265
266 /*
267  * Helper function for ost_punch(): release lock acquired by
268  * ost_punch_lock_get(), if any.
269  */
270 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
271                                struct lustre_handle *lh)
272 {
273         ENTRY;
274         if (lustre_handle_is_used(lh))
275                 ldlm_lock_decref(lh, LCK_PW);
276         EXIT;
277 }
278
279 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
280                      struct obd_trans_info *oti)
281 {
282         struct obd_info *oinfo;
283         struct ost_body *body, *repbody;
284         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
285         int rc;
286         struct lustre_handle lh = {0,};
287         ENTRY;
288
289         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
290         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
291
292         /* ost_body is varified and swabbed in ost_hpreq_handler() */
293         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
294         LASSERT(body != NULL);
295
296         OBD_ALLOC_PTR(oinfo);
297         if (NULL == oinfo)
298                 RETURN(-ENOMEM);
299
300         oinfo->oi_oa = &body->oa;
301         oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
302         oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
303
304         if ((oinfo->oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
305             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
306                 GOTO(out, rc = -EINVAL);
307
308         rc = lustre_pack_reply(req, 2, size, NULL);
309         if (rc)
310                 GOTO(out, rc);
311
312         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
313                                  sizeof(*repbody));
314         rc = ost_punch_lock_get(exp, oinfo->oi_oa, &lh);
315         if (rc == 0) {
316                 if (oinfo->oi_oa->o_valid & OBD_MD_FLFLAGS &&
317                     oinfo->oi_oa->o_flags == OBD_FL_TRUNCLOCK)
318                         /*
319                          * If OBD_FL_TRUNCLOCK is the only bit set in
320                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
321                          * through filter_setattr() to filter_iocontrol().
322                          */
323                         oinfo->oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
324
325                 req->rq_status = obd_punch(exp, oinfo, oti, NULL);
326                 ost_punch_lock_put(exp, oinfo->oi_oa, &lh);
327         }
328
329         repbody->oa = *oinfo->oi_oa;
330 out:
331         OBD_FREE_PTR(oinfo);
332         RETURN(rc);
333 }
334
335 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
336 {
337         struct obd_info *oinfo;
338         struct ost_body *body, *repbody;
339         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
340         int rc;
341         ENTRY;
342
343         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
344                                   lustre_swab_ost_body);
345         if (body == NULL)
346                 RETURN(-EFAULT);
347
348         rc = lustre_pack_reply(req, 2, size, NULL);
349         if (rc)
350                 RETURN(rc);
351
352         OBD_ALLOC_PTR(oinfo);
353         if (NULL == oinfo)
354                 RETURN(-ENOMEM);
355
356         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
357                                  sizeof(*repbody));
358
359         oinfo->oi_oa = &body->oa;
360         req->rq_status = obd_sync(exp, oinfo, repbody->oa.o_size,
361                                   repbody->oa.o_blocks, NULL);
362         repbody->oa = *oinfo->oi_oa;
363
364         OBD_FREE_PTR(oinfo);
365         RETURN(0);
366 }
367
368 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
369                        struct obd_trans_info *oti)
370 {
371         struct ost_body *body, *repbody;
372         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
373         int rc;
374         struct obd_info *oinfo = NULL;
375         ENTRY;
376
377         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
378                                   lustre_swab_ost_body);
379         if (body == NULL)
380                 RETURN(-EFAULT);
381
382         rc = lustre_pack_reply(req, 2, size, NULL);
383         if (rc)
384                 RETURN(rc);
385
386         OBD_ALLOC_PTR(oinfo);
387         if (NULL == oinfo)
388                 RETURN(-ENOMEM);
389
390         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
391                                  sizeof(*repbody));
392         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
393
394         oinfo->oi_oa = &repbody->oa;
395         req->rq_status = obd_setattr(exp, oinfo, oti);
396
397         OBD_FREE_PTR(oinfo);
398         RETURN(0);
399 }
400
401 static int ost_bulk_timeout(void *data)
402 {
403         ENTRY;
404         /* We don't fail the connection here, because having the export
405          * killed makes the (vital) call to commitrw very sad.
406          */
407         RETURN(1);
408 }
409
410 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
411                                cksum_type_t cksum_type)
412 {
413         __u32 cksum;
414         int i;
415
416         cksum = init_checksum(cksum_type);
417         for (i = 0; i < desc->bd_iov_count; i++) {
418                 struct page *page = desc->bd_iov[i].kiov_page;
419                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
420                 char *ptr = kmap(page) + off;
421                 int len = desc->bd_iov[i].kiov_len;
422
423                 /* corrupt the data before we compute the checksum, to
424                  * simulate a client->OST data error */
425                 if (i == 0 && opc == OST_WRITE &&
426                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE))
427                         memcpy(ptr, "bad3", min(4, len));
428                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
429                 /* corrupt the data after we compute the checksum, to
430                  * simulate an OST->client data error */
431                 if (i == 0 && opc == OST_READ &&
432                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND)) {
433                         memcpy(ptr, "bad4", min(4, len));
434                         /* nobody should use corrupted page again */
435                         ClearPageUptodate(page);
436                 }
437                 kunmap(page);
438         }
439
440         return cksum;
441 }
442
443 static int ost_brw_lock_get(int mode, struct obd_export *exp,
444                             struct obd_ioobj *obj, struct niobuf_remote *nb,
445                             struct lustre_handle *lh)
446 {
447         int flags                 = 0;
448         int nrbufs                = obj->ioo_bufcnt;
449         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
450         ldlm_policy_data_t policy;
451         int i;
452
453         ENTRY;
454
455         LASSERT(mode == LCK_PR || mode == LCK_PW);
456         LASSERT(!lustre_handle_is_used(lh));
457
458         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
459                 RETURN(0);
460
461         /* EXPENSIVE ASSERTION */
462         for (i = 1; i < nrbufs; i ++)
463                 LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
464                         (nb[i].flags & OBD_BRW_SRVLOCK));
465
466         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
467         policy.l_extent.end   = (nb[nrbufs - 1].offset +
468                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
469
470         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
471                                       LDLM_EXTENT, &policy, mode, &flags,
472                                       ldlm_blocking_ast, ldlm_completion_ast,
473                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
474 }
475
476 static void ost_brw_lock_put(int mode,
477                              struct obd_ioobj *obj, struct niobuf_remote *niob,
478                              struct lustre_handle *lh)
479 {
480         ENTRY;
481         LASSERT(mode == LCK_PR || mode == LCK_PW);
482         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
483                 lustre_handle_is_used(lh));
484         if (lustre_handle_is_used(lh)) {
485                 struct ldlm_lock *lock = ldlm_handle2lock(lh);
486                 ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
487                 LDLM_LOCK_PUT(lock);
488                 ldlm_lock_decref(lh, mode);
489         }
490         EXIT;
491 }
492
493 struct ost_prolong_data {
494         struct obd_export *opd_exp;
495         ldlm_policy_data_t opd_policy;
496         struct obdo *opd_oa;
497         ldlm_mode_t opd_mode;
498         int opd_lock_match;
499         int opd_timeout;
500 };
501
502 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
503 {
504         struct ost_prolong_data *opd = data;
505
506         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
507
508         if (lock->l_req_mode != lock->l_granted_mode) {
509                 /* scan granted locks only */
510                 return LDLM_ITER_STOP;
511         }
512
513         if (lock->l_export != opd->opd_exp) {
514                 /* prolong locks only for given client */
515                 return LDLM_ITER_CONTINUE;
516         }
517
518         if (!(lock->l_granted_mode & opd->opd_mode)) {
519                 /* we aren't interesting in all type of locks */
520                 return LDLM_ITER_CONTINUE;
521         }
522
523         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
524             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
525                 /* the request doesn't cross the lock, skip it */
526                 return LDLM_ITER_CONTINUE;
527         }
528
529         /* Fill the obdo with the matched lock handle.
530          * XXX: it is possible in some cases the IO RPC is covered by several
531          * locks, even for the write case, so it may need to be a lock list. */
532         if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
533                 opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
534                 opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
535         }
536
537         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
538                 /* ignore locks not being cancelled */
539                 return LDLM_ITER_CONTINUE;
540         }
541
542         CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
543                lock->l_resource->lr_name.name[0],
544                lock->l_resource->lr_name.name[1],
545                opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
546         /* OK. this is a possible lock the user holds doing I/O
547          * let's refresh eviction timer for it */
548         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
549         opd->opd_lock_match = 1;
550
551         return LDLM_ITER_CONTINUE;
552 }
553
554 static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
555                                 struct niobuf_remote *nb, struct obdo *oa,
556                                 ldlm_mode_t mode)
557
558
559 {
560         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
561         struct ost_prolong_data opd = { 0 };
562         int nrbufs = obj->ioo_bufcnt;
563
564         ENTRY;
565
566         opd.opd_mode = mode;
567         opd.opd_exp = req->rq_export;
568         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
569         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
570                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
571
572         /* prolong locks for the current service time of the corresponding
573          * portal (= OST_IO_PORTAL) */
574         opd.opd_timeout = AT_OFF ? obd_timeout / 2 :
575                           max(at_est2timeout(at_get(&req->rq_rqbd->
576                               rqbd_service->srv_at_estimate)), ldlm_timeout);
577
578         CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
579                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
580                opd.opd_policy.l_extent.end);
581
582         if (oa->o_valid & OBD_MD_FLHANDLE) {
583                 struct ldlm_lock *lock;
584
585                 lock = ldlm_handle2lock(&oa->o_handle);
586                 if (lock != NULL) {
587                         ost_prolong_locks_iter(lock, &opd);
588                         if (opd.opd_lock_match) {
589                                 LDLM_LOCK_PUT(lock);
590                                 RETURN(1);
591                         }
592
593                         /* Check if the lock covers the whole IO region,
594                          * otherwise iterate through the resource. */
595                         if (lock->l_policy_data.l_extent.end >=
596                             opd.opd_policy.l_extent.end &&
597                             lock->l_policy_data.l_extent.start <=
598                             opd.opd_policy.l_extent.start) {
599                                 LDLM_LOCK_PUT(lock);
600                                 RETURN(0);
601                         }
602                         LDLM_LOCK_PUT(lock);
603                 }
604         }
605
606         opd.opd_oa = oa;
607         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
608                               ost_prolong_locks_iter, &opd);
609         RETURN(opd.opd_lock_match);
610 }
611
612 /* Allocate thread local buffers if needed */
613 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
614 {
615         struct ost_thread_local_cache *tls =
616                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
617
618         /* In normal mode of operation an I/O request is serviced only
619          * by ll_ost_io threads each of them has own tls buffers allocated by
620          * ost_thread_init().
621          * During recovery, an I/O request may be queued until any of the ost
622          * service threads process it. Not necessary it should be one of
623          * ll_ost_io threads. In that case we dynamically allocating tls
624          * buffers for the request service time. */
625         if (unlikely(tls == NULL)) {
626                 LASSERT(r->rq_export->exp_in_recovery);
627                 OBD_ALLOC_PTR(tls);
628                 if (tls != NULL) {
629                         tls->temporary = 1;
630                         r->rq_svc_thread->t_data = tls;
631                 }
632         }
633         return  tls;
634 }
635
636 /* Free thread local buffers if they were allocated only for servicing
637  * this one request */
638 static void ost_tls_put(struct ptlrpc_request *r)
639 {
640         struct ost_thread_local_cache *tls =
641                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
642
643         if (unlikely(tls->temporary)) {
644                 OBD_FREE_PTR(tls);
645                 r->rq_svc_thread->t_data = NULL;
646         }
647 }
648
649 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
650 {
651         struct ptlrpc_bulk_desc *desc = NULL;
652         struct obd_export       *exp = req->rq_export;
653         struct niobuf_remote *remote_nb;
654         struct niobuf_local *local_nb;
655         struct obd_ioobj *ioo;
656         struct ost_body *body, *repbody;
657         struct l_wait_info lwi;
658         struct lustre_handle lockh = { 0 };
659         __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
660         int niocount, npages, nob = 0, rc, i;
661         int no_reply = 0;
662         struct ost_thread_local_cache *tls;
663         ENTRY;
664
665         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
666                 GOTO(out, rc = -EIO);
667
668         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
669
670         /* Check if there is eviction in progress, and if so, wait for it to
671          * finish */
672         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
673                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
674                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
675                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
676                         &lwi);
677         }
678         if (exp->exp_failed)
679                 GOTO(out, rc = -ENOTCONN);
680
681         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
682          * ost_rw_hpreq_check(). */
683         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
684         LASSERT(body != NULL);
685
686         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
687         LASSERT(ioo != NULL);
688
689         niocount = ioo->ioo_bufcnt;
690         remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
691                                    niocount * sizeof(*remote_nb));
692         LASSERT(remote_nb != NULL);
693
694         rc = lustre_pack_reply(req, 2, size, NULL);
695         if (rc)
696                 GOTO(out, rc);
697
698         tls = ost_tls_get(req);
699         if (tls == NULL)
700                 GOTO(out_bulk, rc = -ENOMEM);
701         local_nb = tls->local;
702
703         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
704         if (rc != 0)
705                 GOTO(out_tls, rc);
706
707         /*
708          * If getting the lock took more time than
709          * client was willing to wait, drop it. b=11330
710          */
711         if (cfs_time_current_sec() > req->rq_deadline ||
712             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
713                 no_reply = 1;
714                 CERROR("Dropping timed-out read from %s because locking"
715                        "object "LPX64" took %ld seconds (limit was %ld).\n",
716                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
717                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
718                        req->rq_deadline - req->rq_arrival_time.tv_sec);
719                 GOTO(out_lock, rc = -ETIMEDOUT);
720         }
721
722         npages = OST_THREAD_POOL_SIZE;
723         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
724                         remote_nb, &npages, local_nb, oti);
725         if (rc != 0)
726                 GOTO(out_lock, rc);
727
728         desc = ptlrpc_prep_bulk_exp(req, npages,
729                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
730         if (desc == NULL)
731                 GOTO(out_lock, rc = -ENOMEM);
732
733         if (!lustre_handle_is_used(&lockh))
734                 /* no needs to try to prolong lock if server is asked
735                  * to handle locking (= OBD_BRW_SRVLOCK) */
736                 ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa,
737                                      LCK_PW | LCK_PR);
738
739         nob = 0;
740         for (i = 0; i < npages; i++) {
741                 int page_rc = local_nb[i].rc;
742
743                 if (page_rc < 0) {              /* error */
744                         rc = page_rc;
745                         break;
746                 }
747
748                 nob += page_rc;
749                 if (page_rc != 0) {             /* some data! */
750                         LASSERT (local_nb[i].page != NULL);
751                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
752                                               local_nb[i].offset & ~CFS_PAGE_MASK,
753                                               page_rc);
754                 }
755
756                 if (page_rc != local_nb[i].len) { /* short read */
757                         /* All subsequent pages should be 0 */
758                         while(++i < npages)
759                                 LASSERT(local_nb[i].rc == 0);
760                         break;
761                 }
762         }
763
764         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
765                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
766
767                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
768                         cksum_type = cksum_type_unpack(body->oa.o_flags);
769                 body->oa.o_flags = cksum_type_pack(cksum_type);
770                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
771                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
772                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
773         } else {
774                 body->oa.o_valid = 0;
775         }
776         /* We're finishing using body->oa as an input variable */
777
778         /* Check if client was evicted while we were doing i/o before touching
779            network */
780         if (rc == 0) {
781                 /* Check if there is eviction in progress, and if so, wait for
782                  * it to finish */
783                 if (unlikely(atomic_read(&exp->exp_obd->
784                                                 obd_evict_inprogress))) {
785                         lwi = LWI_INTR(NULL, NULL);
786                         rc = l_wait_event(exp->exp_obd->
787                                                 obd_evict_inprogress_waitq,
788                                           !atomic_read(&exp->exp_obd->
789                                                         obd_evict_inprogress),
790                                           &lwi);
791                 }
792                 /* Check if client was evicted or tried to reconnect already */
793                 if (exp->exp_failed || exp->exp_abort_active_req)
794                         rc = -ENOTCONN;
795                 else
796                         rc = ptlrpc_start_bulk_transfer(desc);
797                 if (rc == 0) {
798                         time_t start = cfs_time_current_sec();
799                         do {
800                                 long timeoutl = req->rq_deadline -
801                                         cfs_time_current_sec();
802                                 cfs_duration_t timeout = (timeoutl <= 0 || rc) ?
803                                         CFS_TICK : cfs_time_seconds(timeoutl);
804                                 lwi = LWI_TIMEOUT_INTERVAL(timeout,
805                                                            cfs_time_seconds(1),
806                                                            ost_bulk_timeout,
807                                                            desc);
808                                 rc = l_wait_event(desc->bd_waitq,
809                                                   !ptlrpc_server_bulk_active(desc) ||
810                                                   exp->exp_failed ||
811                                                   exp->exp_abort_active_req,
812                                                   &lwi);
813                                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
814                                 /* Wait again if we changed deadline */
815                         } while ((rc == -ETIMEDOUT) &&
816                                  (req->rq_deadline > cfs_time_current_sec()));
817
818                         if (rc == -ETIMEDOUT) {
819                                 DEBUG_REQ(D_ERROR, req,
820                                           "timeout on bulk PUT after %ld%+lds",
821                                           req->rq_deadline - start,
822                                           cfs_time_current_sec() -
823                                           req->rq_deadline);
824                                 ptlrpc_abort_bulk(desc);
825                         } else if (exp->exp_failed) {
826                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
827                                 rc = -ENOTCONN;
828                                 ptlrpc_abort_bulk(desc);
829                         } else if (exp->exp_abort_active_req) {
830                                 DEBUG_REQ(D_ERROR, req, "Reconnect on bulk PUT");
831                                 /* we don't reply anyway */
832                                 rc = -ETIMEDOUT;
833                                 ptlrpc_abort_bulk(desc);
834                         } else if (!desc->bd_success ||
835                                    desc->bd_nob_transferred != desc->bd_nob) {
836                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
837                                           desc->bd_success ?
838                                           "truncated" : "network error on",
839                                           desc->bd_nob_transferred,
840                                           desc->bd_nob);
841                                 /* XXX should this be a different errno? */
842                                 rc = -ETIMEDOUT;
843                         }
844                 } else {
845                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
846                 }
847                 no_reply = rc != 0;
848         }
849
850         /* Must commit after prep above in all cases */
851         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
852                           remote_nb, npages, local_nb, oti, rc);
853
854         if (rc == 0) {
855                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
856                                          sizeof(*repbody));
857                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
858         }
859
860  out_lock:
861         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
862  out_tls:
863         ost_tls_put(req);
864  out_bulk:
865         if (desc)
866                 ptlrpc_free_bulk(desc);
867  out:
868         LASSERT(rc <= 0);
869         if (rc == 0) {
870                 req->rq_status = nob;
871                 ptlrpc_lprocfs_brw(req, nob);
872                 target_committed_to_req(req);
873                 ptlrpc_reply(req);
874         } else if (!no_reply) {
875                 /* Only reply if there was no comms problem with bulk */
876                 target_committed_to_req(req);
877                 req->rq_status = rc;
878                 ptlrpc_error(req);
879         } else {
880                 /* reply out callback would free */
881                 ptlrpc_req_drop_rs(req);
882                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
883                       "client will retry\n",
884                       exp->exp_obd->obd_name,
885                       exp->exp_client_uuid.uuid,
886                       exp->exp_connection->c_remote_uuid.uuid,
887                       libcfs_id2str(req->rq_peer));
888         }
889
890         RETURN(rc);
891 }
892
893 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
894 {
895         struct ptlrpc_bulk_desc *desc = NULL;
896         struct obd_export       *exp = req->rq_export;
897         struct niobuf_remote    *remote_nb;
898         struct niobuf_local     *local_nb;
899         struct obd_ioobj        *ioo;
900         struct ost_body         *body, *repbody;
901         struct l_wait_info       lwi;
902         struct lustre_handle     lockh = {0};
903         __u32                   *rcs;
904         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
905         int objcount, niocount, npages;
906         int rc, i, j;
907         obd_count                client_cksum = 0, server_cksum = 0;
908         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
909         int                      no_reply = 0;
910         struct ost_thread_local_cache *tls;
911         ENTRY;
912
913         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
914                 GOTO(out, rc = -EIO);
915         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
916                 GOTO(out, rc = -EFAULT);
917
918         /* pause before transaction has been started */
919         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
920
921         /* Check if there is eviction in progress, and if so, wait for it to
922          * finish */
923         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
924                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
925                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
926                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
927                         &lwi);
928         }
929         if (exp->exp_failed)
930                 GOTO(out, rc = -ENOTCONN);
931
932         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
933          * ost_rw_hpreq_check(). */
934         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
935         LASSERT(body != NULL);
936
937         if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
938             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
939                 libcfs_memory_pressure_set();
940
941         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
942                    sizeof(*ioo);
943         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
944                              objcount * sizeof(*ioo));
945         LASSERT(ioo != NULL);
946         for (niocount = i = 0; i < objcount; i++)
947                 niocount += ioo[i].ioo_bufcnt;
948
949         remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
950                                    niocount * sizeof(*remote_nb));
951         LASSERT(remote_nb != NULL);
952
953         size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
954         rc = lustre_pack_reply(req, 3, size, NULL);
955         if (rc != 0)
956                 GOTO(out, rc);
957
958         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
959         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
960                              niocount * sizeof(*rcs));
961
962         tls = ost_tls_get(req);
963         if (tls == NULL)
964                 GOTO(out_bulk, rc = -ENOMEM);
965         local_nb = tls->local;
966
967         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
968         if (rc != 0)
969                 GOTO(out_tls, rc);
970
971         /*
972          * If getting the lock took more time than
973          * client was willing to wait, drop it. b=11330
974          */
975         if (cfs_time_current_sec() > req->rq_deadline ||
976             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
977                 no_reply = 1;
978                 CERROR("Dropping timed-out write from %s because locking "
979                        "object "LPX64" took %ld seconds (limit was %ld).\n",
980                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
981                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
982                        req->rq_deadline - req->rq_arrival_time.tv_sec);
983                 GOTO(out_lock, rc = -ETIMEDOUT);
984         }
985
986         if (!lustre_handle_is_used(&lockh))
987                 /* no needs to try to prolong lock if server is asked
988                  * to handle locking (= OBD_BRW_SRVLOCK) */
989                 ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
990
991         /* obd_preprw clobbers oa->valid, so save what we need */
992         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
993                 client_cksum = body->oa.o_cksum;
994                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
995                         cksum_type = cksum_type_unpack(body->oa.o_flags);
996         }
997
998         /* Because we already sync grant info with client when reconnect,
999          * grant info will be cleared for resent req, then fed_grant and
1000          * total_grant will not be modified in following preprw_write*/
1001         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1002                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1003                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1004         }
1005
1006         npages = OST_THREAD_POOL_SIZE;
1007         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
1008                         ioo, remote_nb, &npages, local_nb, oti);
1009         if (rc != 0)
1010                 GOTO(out_lock, rc);
1011
1012         desc = ptlrpc_prep_bulk_exp(req, npages,
1013                                      BULK_GET_SINK, OST_BULK_PORTAL);
1014         if (desc == NULL)
1015                 GOTO(out_lock, rc = -ENOMEM);
1016
1017         /* NB Having prepped, we must commit... */
1018
1019         for (i = 0; i < npages; i++)
1020                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1021                                       local_nb[i].offset & ~CFS_PAGE_MASK,
1022                                       local_nb[i].len);
1023
1024         /* Check if client was evicted or tried to reconnect while we
1025          * were doing i/o before touching network */
1026         if (desc->bd_export->exp_failed ||
1027             desc->bd_export->exp_abort_active_req)
1028                 rc = -ENOTCONN;
1029         else
1030                 rc = ptlrpc_start_bulk_transfer(desc);
1031         if (rc == 0) {
1032                 time_t start = cfs_time_current_sec();
1033                 do {
1034                         long timeoutl = req->rq_deadline -
1035                                 cfs_time_current_sec();
1036                         cfs_duration_t timeout = (timeoutl <= 0 || rc) ?
1037                                 CFS_TICK : cfs_time_seconds(timeoutl);
1038                         lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
1039                                                    ost_bulk_timeout, desc);
1040                         rc = l_wait_event(desc->bd_waitq,
1041                                           !ptlrpc_server_bulk_active(desc) ||
1042                                           desc->bd_export->exp_failed ||
1043                                           desc->bd_export->exp_abort_active_req,
1044                                           &lwi);
1045                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1046                         /* Wait again if we changed deadline */
1047                 } while ((rc == -ETIMEDOUT) &&
1048                          (req->rq_deadline > cfs_time_current_sec()));
1049
1050                 if (rc == -ETIMEDOUT) {
1051                         DEBUG_REQ(D_ERROR, req,
1052                                   "timeout on bulk GET after %ld%+lds",
1053                                   req->rq_deadline - start,
1054                                   cfs_time_current_sec() -
1055                                   req->rq_deadline);
1056                         ptlrpc_abort_bulk(desc);
1057                 } else if (desc->bd_export->exp_failed) {
1058                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1059                         rc = -ENOTCONN;
1060                         ptlrpc_abort_bulk(desc);
1061                 } else if (desc->bd_export->exp_abort_active_req) {
1062                         DEBUG_REQ(D_ERROR, req, "Reconnect on bulk GET");
1063                         /* we don't reply anyway */
1064                         rc = -ETIMEDOUT;
1065                         ptlrpc_abort_bulk(desc);
1066                 } else if (!desc->bd_success ||
1067                            desc->bd_nob_transferred != desc->bd_nob) {
1068                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1069                                   desc->bd_success ?
1070                                   "truncated" : "network error on",
1071                                   desc->bd_nob_transferred, desc->bd_nob);
1072                         /* XXX should this be a different errno? */
1073                         rc = -ETIMEDOUT;
1074                 }
1075         } else {
1076                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
1077         }
1078         no_reply = rc != 0;
1079
1080         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1081                                  sizeof(*repbody));
1082         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1083
1084         if (client_cksum != 0 && rc == 0) {
1085                 static int cksum_counter;
1086
1087                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1088                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1089                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1090                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1091                 repbody->oa.o_cksum = server_cksum;
1092                 cksum_counter++;
1093                 if (unlikely(client_cksum != server_cksum)) {
1094                         CERROR("client csum %x, server csum %x\n",
1095                                client_cksum, server_cksum);
1096                         cksum_counter = 0;
1097                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1098                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1099                                cksum_counter, libcfs_id2str(req->rq_peer),
1100                                server_cksum);
1101                 }
1102         }
1103
1104         /* Check if there is eviction in progress, and if so, wait for
1105          * it to finish */
1106         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
1107                 lwi = LWI_INTR(NULL, NULL);
1108                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
1109                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
1110                         &lwi);
1111         }
1112         if (rc == 0 && exp->exp_failed)
1113                 rc = -ENOTCONN;
1114
1115         /* Must commit after prep above in all cases */
1116         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1117                           remote_nb, npages, local_nb, oti, rc);
1118
1119         if (rc == -ENOTCONN)
1120                 /* quota acquire process has been given up because
1121                  * either the client has been evicted or the client
1122                  * has timed out the request already */
1123                 no_reply = 1;
1124
1125         if (unlikely(client_cksum != server_cksum && rc == 0)) {
1126                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1127                 char *msg;
1128                 char *via;
1129                 char *router;
1130
1131                 if (new_cksum == server_cksum)
1132                         msg = "changed in transit before arrival at OST";
1133                 else if (new_cksum == client_cksum)
1134                         msg = "initial checksum before message complete";
1135                 else
1136                         msg = "changed in transit AND after initial checksum";
1137
1138                 if (req->rq_peer.nid == desc->bd_sender) {
1139                         via = router = "";
1140                 } else {
1141                         via = " via ";
1142                         router = libcfs_nid2str(desc->bd_sender);
1143                 }
1144
1145                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from %s"
1146                                    "%s%s inum "LPU64"/"LPU64" object "LPU64"/"
1147                                    LPU64" extent ["LPU64"-"LPU64"]\n",
1148                                    exp->exp_obd->obd_name, msg,
1149                                    libcfs_id2str(req->rq_peer),
1150                                    via, router,
1151                                    body->oa.o_valid & OBD_MD_FLFID ?
1152                                                 body->oa.o_fid : (__u64)0,
1153                                    body->oa.o_valid & OBD_MD_FLFID ?
1154                                                 body->oa.o_generation :(__u64)0,
1155                                    body->oa.o_id,
1156                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1157                                                 body->oa.o_gr : (__u64)0,
1158                                    local_nb[0].offset,
1159                                    local_nb[npages-1].offset +
1160                                    local_nb[npages-1].len - 1 );
1161                 CERROR("client csum %x, original server csum %x, "
1162                        "server csum now %x\n",
1163                        client_cksum, server_cksum, new_cksum);
1164         }
1165
1166         if (rc == 0) {
1167                 int nob = 0;
1168
1169                 /* set per-requested niobuf return codes */
1170                 for (i = j = 0; i < niocount; i++) {
1171                         int len = remote_nb[i].len;
1172
1173                         nob += len;
1174                         rcs[i] = 0;
1175                         do {
1176                                 LASSERT(j < npages);
1177                                 if (local_nb[j].rc < 0)
1178                                         rcs[i] = local_nb[j].rc;
1179                                 len -= local_nb[j].len;
1180                                 j++;
1181                         } while (len > 0);
1182                         LASSERT(len == 0);
1183                 }
1184                 LASSERT(j == npages);
1185                 ptlrpc_lprocfs_brw(req, nob);
1186         }
1187
1188  out_lock:
1189         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1190  out_tls:
1191         ost_tls_put(req);
1192  out_bulk:
1193         if (desc)
1194                 ptlrpc_free_bulk(desc);
1195  out:
1196         if (rc == 0) {
1197                 oti_to_request(oti, req);
1198                 target_committed_to_req(req);
1199                 rc = ptlrpc_reply(req);
1200         } else if (!no_reply) {
1201                 /* Only reply if there was no comms problem with bulk */
1202                 target_committed_to_req(req);
1203                 req->rq_status = rc;
1204                 ptlrpc_error(req);
1205         } else {
1206                 /* reply out callback would free */
1207                 ptlrpc_req_drop_rs(req);
1208                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1209                       "client will retry\n",
1210                       exp->exp_obd->obd_name,
1211                       exp->exp_client_uuid.uuid,
1212                       exp->exp_connection->c_remote_uuid.uuid,
1213                       libcfs_id2str(req->rq_peer));
1214         }
1215         libcfs_memory_pressure_clr();
1216         RETURN(rc);
1217 }
1218
1219 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1220 {
1221         struct ost_body *body = NULL, *repbody;
1222         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1223         char *key, *val = NULL;
1224         int keylen, vallen, rc = 0;
1225         ENTRY;
1226
1227         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1228         if (key == NULL) {
1229                 DEBUG_REQ(D_HA, req, "no set_info key");
1230                 RETURN(-EFAULT);
1231         }
1232         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1233
1234         if (KEY_IS(KEY_GRANT_SHRINK)) {
1235                 rc = lustre_pack_reply(req, 2, size, NULL);
1236                 if (rc)
1237                         RETURN(rc);
1238         } else {
1239                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1240                 if (rc)
1241                         RETURN(rc);
1242         }
1243
1244         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1245         if (vallen) {
1246                 if (KEY_IS(KEY_GRANT_SHRINK)) {
1247                         body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
1248                                                   sizeof(*body),
1249                                                   lustre_swab_ost_body);
1250                         if (!body)
1251                                 RETURN(-EFAULT);
1252
1253                         repbody = lustre_msg_buf(req->rq_repmsg,
1254                                                  REPLY_REC_OFF,
1255                                                  sizeof(*repbody));
1256                         memcpy(repbody, body, sizeof(*body));
1257                         val = (char*)repbody;
1258                 } else
1259                         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0);
1260         }
1261
1262         if (KEY_IS(KEY_EVICT_BY_NID)) {
1263                 if (val && vallen)
1264                         obd_export_evict_by_nid(exp->exp_obd, val);
1265
1266                 GOTO(out, rc = 0);
1267         }
1268
1269         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1270 out:
1271         lustre_msg_set_status(req->rq_repmsg, 0);
1272         RETURN(rc);
1273 }
1274
1275 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1276 {
1277         void *key, *reply;
1278         int keylen, rc = 0;
1279         int size[2] = { sizeof(struct ptlrpc_body), 0 };
1280         ENTRY;
1281
1282         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1283         if (key == NULL) {
1284                 DEBUG_REQ(D_HA, req, "no get_info key");
1285                 RETURN(-EFAULT);
1286         }
1287         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1288
1289         /* call once to get the size to allocate the reply buffer */
1290         rc = obd_get_info(exp, keylen, key, &size[1], NULL, NULL);
1291         if (rc)
1292                 RETURN(rc);
1293
1294         rc = lustre_pack_reply(req, 2, size, NULL);
1295         if (rc)
1296                 RETURN(rc);
1297
1298         reply = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*reply));
1299         /* call again to fill in the reply buffer */
1300         rc = obd_get_info(exp, keylen, key, size, reply, NULL);
1301         lustre_msg_set_status(req->rq_repmsg, 0);
1302
1303         RETURN(rc);
1304 }
1305
1306 #ifdef HAVE_QUOTA_SUPPORT
1307 static int ost_handle_quotactl(struct ptlrpc_request *req)
1308 {
1309         struct obd_quotactl *oqctl, *repoqc;
1310         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1311         int rc;
1312         ENTRY;
1313
1314         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1315                                    lustre_swab_obd_quotactl);
1316         if (oqctl == NULL)
1317                 GOTO(out, rc = -EPROTO);
1318
1319         rc = lustre_pack_reply(req, 2, size, NULL);
1320         if (rc)
1321                 GOTO(out, rc);
1322
1323         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1324
1325         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1326         *repoqc = *oqctl;
1327 out:
1328         RETURN(rc);
1329 }
1330
1331 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1332 {
1333         struct obd_quotactl *oqctl;
1334         int rc;
1335         ENTRY;
1336
1337         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1338                                    lustre_swab_obd_quotactl);
1339         if (oqctl == NULL)
1340                 RETURN(-EPROTO);
1341
1342         rc = lustre_pack_reply(req, 1, NULL, NULL);
1343         if (rc)
1344                 RETURN(rc);
1345
1346         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1347         RETURN(0);
1348 }
1349
1350 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1351 {
1352         struct quota_adjust_qunit *oqaq, *repoqa;
1353         struct lustre_quota_ctxt *qctxt;
1354         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqa) };
1355         int rc;
1356         ENTRY;
1357
1358         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1359         oqaq = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqaq),
1360                                   lustre_swab_quota_adjust_qunit);
1361
1362         if (oqaq == NULL)
1363                 GOTO(out, rc = -EPROTO);
1364         rc = lustre_pack_reply(req, 2, size, NULL);
1365         if (rc)
1366                 GOTO(out, rc);
1367         repoqa = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqa));
1368         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
1369         *repoqa = *oqaq;
1370  out:
1371         RETURN(rc);
1372 }
1373 #endif
1374
1375 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1376  * (if requested) */
1377 int ost_blocking_ast(struct ldlm_lock *lock,
1378                              struct ldlm_lock_desc *desc,
1379                              void *data, int flag)
1380 {
1381         struct obd_device *obd = lock->l_export->exp_obd;
1382         if (flag == LDLM_CB_CANCELING &&
1383             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1384             (obd->u.ost.ost_sync_on_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1385              (obd->u.ost.ost_sync_on_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1386               lock->l_flags & LDLM_FL_CBPENDING))) {
1387                 struct obd_info *oinfo;
1388                 int rc;
1389
1390                 OBD_ALLOC_PTR(oinfo);
1391                 if (!oinfo)
1392                         RETURN(-ENOMEM);
1393
1394                 OBDO_ALLOC(oinfo->oi_oa);
1395                 if (!oinfo->oi_oa) {
1396                         OBD_FREE_PTR(oinfo);
1397                         RETURN(-ENOMEM);
1398                 }
1399
1400                 oinfo->oi_oa->o_id = lock->l_resource->lr_name.name[0];
1401                 oinfo->oi_oa->o_valid = OBD_MD_FLID;
1402
1403                 rc = obd_sync_rqset(lock->l_export, oinfo,
1404                                     lock->l_policy_data.l_extent.start,
1405                                     lock->l_policy_data.l_extent.end);
1406                 if (rc)
1407                         CERROR("Error %d syncing data on lock cancel\n", rc);
1408
1409                 OBDO_FREE(oinfo->oi_oa);
1410                 OBD_FREE_PTR(oinfo);
1411         }
1412
1413         return ldlm_server_blocking_ast(lock, desc, data, flag);
1414 }
1415
1416 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1417                                        struct obd_device *obd, int *process)
1418 {
1419         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1420         case OST_CONNECT: /* This will never get here, but for completeness. */
1421         case OST_DISCONNECT:
1422                *process = 1;
1423                RETURN(0);
1424
1425         case OBD_PING:
1426         case OST_CREATE:
1427         case OST_DESTROY:
1428         case OST_PUNCH:
1429         case OST_SETATTR:
1430         case OST_SYNC:
1431         case OST_WRITE:
1432         case OBD_LOG_CANCEL:
1433         case LDLM_ENQUEUE:
1434                 *process = target_queue_recovery_request(req, obd);
1435                 RETURN(0);
1436
1437         default:
1438                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1439                 *process = 0;
1440                 /* XXX what should we set rq_status to here? */
1441                 req->rq_status = -EAGAIN;
1442                 RETURN(ptlrpc_error(req));
1443         }
1444 }
1445
1446 int ost_msg_check_version(struct lustre_msg *msg)
1447 {
1448         int rc;
1449
1450         switch(lustre_msg_get_opc(msg)) {
1451         case OST_CONNECT:
1452         case OST_DISCONNECT:
1453         case OBD_PING:
1454                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1455                 if (rc)
1456                         CERROR("bad opc %u version %08x, expecting %08x\n",
1457                                lustre_msg_get_opc(msg),
1458                                lustre_msg_get_version(msg),
1459                                LUSTRE_OBD_VERSION);
1460                 break;
1461         case OST_CREATE:
1462         case OST_DESTROY:
1463         case OST_GETATTR:
1464         case OST_SETATTR:
1465         case OST_WRITE:
1466         case OST_READ:
1467         case OST_PUNCH:
1468         case OST_STATFS:
1469         case OST_SYNC:
1470         case OST_SET_INFO:
1471         case OST_GET_INFO:
1472 #ifdef HAVE_QUOTA_SUPPORT
1473         case OST_QUOTACHECK:
1474         case OST_QUOTACTL:
1475         case OST_QUOTA_ADJUST_QUNIT:
1476 #endif
1477                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1478                 if (rc)
1479                         CERROR("bad opc %u version %08x, expecting %08x\n",
1480                                lustre_msg_get_opc(msg),
1481                                lustre_msg_get_version(msg),
1482                                LUSTRE_OST_VERSION);
1483                 break;
1484         case LDLM_ENQUEUE:
1485         case LDLM_CONVERT:
1486         case LDLM_CANCEL:
1487         case LDLM_BL_CALLBACK:
1488         case LDLM_CP_CALLBACK:
1489                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1490                 if (rc)
1491                         CERROR("bad opc %u version %08x, expecting %08x\n",
1492                                lustre_msg_get_opc(msg),
1493                                lustre_msg_get_version(msg),
1494                                LUSTRE_DLM_VERSION);
1495                 break;
1496         case LLOG_ORIGIN_CONNECT:
1497         case OBD_LOG_CANCEL:
1498                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1499                 if (rc)
1500                         CERROR("bad opc %u version %08x, expecting %08x\n",
1501                                lustre_msg_get_opc(msg),
1502                                lustre_msg_get_version(msg),
1503                                LUSTRE_LOG_VERSION);
1504                 break;
1505         default:
1506                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1507                 rc = -ENOTSUPP;
1508         }
1509         return rc;
1510 }
1511
1512 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1513                                    struct ldlm_lock *lock)
1514 {
1515         struct niobuf_remote *nb;
1516         struct obd_ioobj *ioo;
1517         struct ost_body *body;
1518         int objcount, niocount;
1519         int mode, opc, i;
1520         __u64 start, end;
1521         ENTRY;
1522
1523         opc = lustre_msg_get_opc(req->rq_reqmsg);
1524         LASSERT(opc == OST_READ || opc == OST_WRITE);
1525
1526         /* As the request may be covered by several locks, do not look at
1527          * o_handle, look at the RPC IO region. */
1528         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
1529                                   lustre_swab_obdo);
1530         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
1531                    sizeof(*ioo);
1532         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
1533                              objcount * sizeof(*ioo));
1534         LASSERT(ioo != NULL);
1535         for (niocount = i = 0; i < objcount; i++)
1536                 niocount += ioo[i].ioo_bufcnt;
1537
1538         nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1539                             niocount * sizeof(*nb));
1540         LASSERT(nb != NULL);
1541
1542         mode = LCK_PW;
1543         if (opc == OST_READ)
1544                 mode |= LCK_PR;
1545
1546         start = nb[0].offset & CFS_PAGE_MASK;
1547         end = (nb[ioo->ioo_bufcnt - 1].offset +
1548                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
1549
1550         LASSERT(lock->l_resource != NULL);
1551         if (lock->l_resource->lr_name.name[0] != ioo->ioo_id)
1552                 RETURN(0);
1553
1554         if (!(lock->l_granted_mode & mode))
1555                 RETURN(0);
1556
1557         if (lock->l_policy_data.l_extent.end < start ||
1558             lock->l_policy_data.l_extent.start > end)
1559                 RETURN(0);
1560
1561         RETURN(1);
1562 }
1563
1564 /**
1565  * Swab buffers needed to call ost_rw_prolong_locks() and call it.
1566  * Return the value from ost_rw_prolong_locks() which is non-zero if
1567  * there is a cancelled lock which is waiting for this IO request.
1568  */
1569 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1570 {
1571         struct niobuf_remote *nb;
1572         struct obd_ioobj *ioo;
1573         struct ost_body *body;
1574         int objcount, niocount;
1575         int mode, opc, i;
1576         ENTRY;
1577
1578         opc = lustre_msg_get_opc(req->rq_reqmsg);
1579         LASSERT(opc == OST_READ || opc == OST_WRITE);
1580
1581         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1582         LASSERT(body != NULL);
1583
1584         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
1585                    sizeof(*ioo);
1586         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
1587                              objcount * sizeof(*ioo));
1588         LASSERT(ioo != NULL);
1589
1590         for (niocount = i = 0; i < objcount; i++)
1591                 niocount += ioo[i].ioo_bufcnt;
1592         nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1593                             niocount * sizeof(*nb));
1594         LASSERT(nb != NULL);
1595         LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
1596
1597         mode = LCK_PW;
1598         if (opc == OST_READ)
1599                 mode |= LCK_PR;
1600         RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
1601 }
1602
1603 static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
1604 {
1605         struct ldlm_res_id res_id = { .name = { oa->o_id } };
1606         struct ost_prolong_data opd = { 0 };
1607         __u64 start, end;
1608         ENTRY;
1609
1610         start = oa->o_size;
1611         end = start + oa->o_blocks;
1612
1613         opd.opd_mode = LCK_PW;
1614         opd.opd_exp = req->rq_export;
1615         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
1616         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
1617                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
1618         else
1619                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
1620
1621         /* prolong locks for the current service time of the corresponding
1622          * portal (= OST_IO_PORTAL) */
1623         opd.opd_timeout = AT_OFF ? obd_timeout / 2 :
1624                           max(at_est2timeout(at_get(&req->rq_rqbd->
1625                               rqbd_service->srv_at_estimate)), ldlm_timeout);
1626
1627         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1628                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
1629                opd.opd_policy.l_extent.end);
1630
1631         opd.opd_oa = oa;
1632
1633         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
1634                               ost_prolong_locks_iter, &opd);
1635         RETURN(opd.opd_lock_match);
1636 }
1637
1638 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1639                                       struct ldlm_lock *lock)
1640 {
1641         struct ost_body *body;
1642         ENTRY;
1643
1644         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
1645                                   lustre_swab_obdo);
1646         LASSERT(body != NULL);
1647
1648         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1649             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1650                 RETURN(1);
1651         RETURN(0);
1652 }
1653
1654 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1655 {
1656         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
1657                                                REQ_REC_OFF, sizeof(*body));
1658         LASSERT(body != NULL);
1659         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1660                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
1661
1662         RETURN(ost_punch_prolong_locks(req, &body->oa));
1663 }
1664
1665 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1666         .hpreq_lock_match  = ost_rw_hpreq_lock_match,
1667         .hpreq_check       = ost_rw_hpreq_check,
1668 };
1669
1670 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1671         .hpreq_lock_match  = ost_punch_hpreq_lock_match,
1672         .hpreq_check       = ost_punch_hpreq_check,
1673 };
1674
1675 /** Assign high priority operations to the request if needed. */
1676 static int ost_hpreq_handler(struct ptlrpc_request *req)
1677 {
1678         ENTRY;
1679         if (req->rq_export) {
1680                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1681                 struct ost_body *body;
1682
1683                 if (opc == OST_READ || opc == OST_WRITE) {
1684                         struct niobuf_remote *nb;
1685                         struct obd_ioobj *ioo;
1686                         int objcount, niocount;
1687                         int swab, i;
1688
1689                         body = lustre_swab_reqbuf(req, REQ_REC_OFF,
1690                                                   sizeof(*body),
1691                                                   lustre_swab_obdo);
1692                         if (!body) {
1693                                 CERROR("Missing/short ost_body\n");
1694                                 RETURN(-EFAULT);
1695                         }
1696                         objcount = lustre_msg_buflen(req->rq_reqmsg,
1697                                                      REQ_REC_OFF + 1) /
1698                                 sizeof(*ioo);
1699                         if (objcount == 0) {
1700                                 CERROR("Missing/short ioobj\n");
1701                                 RETURN(-EFAULT);
1702                         }
1703                         if (objcount > 1) {
1704                                 CERROR("too many ioobjs (%d)\n", objcount);
1705                                 RETURN(-EFAULT);
1706                         }
1707
1708                         swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
1709                                 lustre_req_need_swab(req);
1710                         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
1711                                                  objcount * sizeof(*ioo),
1712                                                  lustre_swab_obd_ioobj);
1713                         if (!ioo) {
1714                                 CERROR("Missing/short ioobj\n");
1715                                 RETURN(-EFAULT);
1716                         }
1717                         for (niocount = i = 0; i < objcount; i++) {
1718                                 if (i > 0 && swab)
1719                                         lustre_swab_obd_ioobj(&ioo[i]);
1720                                 if (ioo[i].ioo_bufcnt == 0) {
1721                                         CERROR("ioo[%d] has zero bufcnt\n", i);
1722                                         RETURN(-EFAULT);
1723                                 }
1724                                 niocount += ioo[i].ioo_bufcnt;
1725                         }
1726                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
1727                                 DEBUG_REQ(D_ERROR, req, "bulk has too many "
1728                                           "pages (%d)", niocount);
1729                                 RETURN(-EFAULT);
1730                         }
1731
1732                         swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
1733                                 lustre_req_need_swab(req);
1734                         nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
1735                                                 niocount * sizeof(*nb),
1736                                                 lustre_swab_niobuf_remote);
1737                         if (!nb) {
1738                                 CERROR("Missing/short niobuf\n");
1739                                 RETURN(-EFAULT);
1740                         }
1741
1742                         if (swab) {
1743                                 /* swab remaining niobufs */
1744                                 for (i = 1; i < niocount; i++)
1745                                         lustre_swab_niobuf_remote(&nb[i]);
1746                         }
1747
1748                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
1749                                 req->rq_ops = &ost_hpreq_rw;
1750                 } else if (opc == OST_PUNCH) {
1751                         body = lustre_swab_reqbuf(req, REQ_REC_OFF,
1752                                                   sizeof(*body),
1753                                                   lustre_swab_obdo);
1754                         if (!body) {
1755                                 CERROR("Missing/short ost_body\n");
1756                                 RETURN(-EFAULT);
1757                         }
1758
1759                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1760                             !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
1761                                 req->rq_ops = &ost_hpreq_punch;
1762                 }
1763         }
1764         RETURN(0);
1765 }
1766
1767 static int ost_handle(struct ptlrpc_request *req)
1768 {
1769         struct obd_trans_info trans_info = { 0, };
1770         struct obd_trans_info *oti = &trans_info;
1771         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1772         struct obd_device *obd = NULL;
1773         ENTRY;
1774
1775         LASSERT(current->journal_info == NULL);
1776         /* XXX identical to MDS */
1777         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1778                 int recovering;
1779
1780                 if (req->rq_export == NULL) {
1781                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1782                                lustre_msg_get_opc(req->rq_reqmsg),
1783                                libcfs_id2str(req->rq_peer));
1784                         req->rq_status = -ENOTCONN;
1785                         GOTO(out, rc = -ENOTCONN);
1786                 }
1787
1788                 obd = req->rq_export->exp_obd;
1789
1790                 /* Check for aborted recovery. */
1791                 spin_lock_bh(&obd->obd_processing_task_lock);
1792                 recovering = obd->obd_recovering;
1793                 spin_unlock_bh(&obd->obd_processing_task_lock);
1794                 if (recovering &&
1795                     target_recovery_check_and_stop(obd) == 0) {
1796                         rc = ost_filter_recovery_request(req, obd,
1797                                                          &should_process);
1798                         if (rc || !should_process)
1799                                 RETURN(rc);
1800                 }
1801         }
1802
1803         oti_init(oti, req);
1804         rc = ost_msg_check_version(req->rq_reqmsg);
1805         if (rc)
1806                 RETURN(rc);
1807
1808         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1809         case OST_CONNECT: {
1810                 CDEBUG(D_INODE, "connect\n");
1811                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
1812                 rc = target_handle_connect(req, ost_handle);
1813                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET2, 0);
1814                 if (!rc)
1815                         obd = req->rq_export->exp_obd;
1816                 break;
1817         }
1818         case OST_DISCONNECT:
1819                 CDEBUG(D_INODE, "disconnect\n");
1820                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
1821                 rc = target_handle_disconnect(req);
1822                 break;
1823         case OST_CREATE:
1824                 CDEBUG(D_INODE, "create\n");
1825                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
1826                 OBD_FAIL_TIMEOUT_MS(OBD_FAIL_OST_PAUSE_CREATE, obd_fail_val);
1827                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1828                         GOTO(out, rc = -ENOSPC);
1829                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1830                         GOTO(out, rc = -EROFS);
1831                 rc = ost_create(req->rq_export, req, oti);
1832                 break;
1833         case OST_DESTROY:
1834                 CDEBUG(D_INODE, "destroy\n");
1835                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
1836                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1837                         GOTO(out, rc = -EROFS);
1838                 rc = ost_destroy(req->rq_export, req, oti);
1839                 break;
1840         case OST_GETATTR:
1841                 CDEBUG(D_INODE, "getattr\n");
1842                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
1843                 rc = ost_getattr(req->rq_export, req);
1844                 break;
1845         case OST_SETATTR:
1846                 CDEBUG(D_INODE, "setattr\n");
1847                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
1848                 rc = ost_setattr(req->rq_export, req, oti);
1849                 break;
1850         case OST_WRITE:
1851                 CDEBUG(D_INODE, "write\n");
1852                 /* req->rq_request_portal would be nice, if it was set */
1853                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1854                         CERROR("%s: deny write request from %s to portal %u\n",
1855                                req->rq_export->exp_obd->obd_name,
1856                                obd_export_nid2str(req->rq_export),
1857                                req->rq_rqbd->rqbd_service->srv_req_portal);
1858                         GOTO(out, rc = -EPROTO);
1859                 }
1860                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1861                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1862                         GOTO(out, rc = -ENOSPC);
1863                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1864                         GOTO(out, rc = -EROFS);
1865                 rc = ost_brw_write(req, oti);
1866                 LASSERT(current->journal_info == NULL);
1867                 /* ost_brw_write sends its own replies */
1868                 RETURN(rc);
1869         case OST_READ:
1870                 CDEBUG(D_INODE, "read\n");
1871                 /* req->rq_request_portal would be nice, if it was set */
1872                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1873                         CERROR("%s: deny read request from %s to portal %u\n",
1874                                req->rq_export->exp_obd->obd_name,
1875                                obd_export_nid2str(req->rq_export),
1876                                req->rq_rqbd->rqbd_service->srv_req_portal);
1877                         GOTO(out, rc = -EPROTO);
1878                 }
1879                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1880                 rc = ost_brw_read(req, oti);
1881                 LASSERT(current->journal_info == NULL);
1882                 /* ost_brw_read sends its own replies */
1883                 RETURN(rc);
1884         case OST_PUNCH:
1885                 CDEBUG(D_INODE, "punch\n");
1886                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1887                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1888                         GOTO(out, rc = -EROFS);
1889                 rc = ost_punch(req->rq_export, req, oti);
1890                 break;
1891         case OST_STATFS:
1892                 CDEBUG(D_INODE, "statfs\n");
1893                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1894                 rc = ost_statfs(req);
1895                 break;
1896         case OST_SYNC:
1897                 CDEBUG(D_INODE, "sync\n");
1898                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1899                 rc = ost_sync(req->rq_export, req);
1900                 break;
1901         case OST_SET_INFO:
1902                 DEBUG_REQ(D_INODE, req, "set_info");
1903                 rc = ost_set_info(req->rq_export, req);
1904                 break;
1905         case OST_GET_INFO:
1906                 DEBUG_REQ(D_INODE, req, "get_info");
1907                 rc = ost_get_info(req->rq_export, req);
1908                 break;
1909 #ifdef HAVE_QUOTA_SUPPORT
1910         case OST_QUOTACHECK:
1911                 CDEBUG(D_INODE, "quotacheck\n");
1912                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACHECK_NET, 0);
1913                 rc = ost_handle_quotacheck(req);
1914                 break;
1915         case OST_QUOTACTL:
1916                 CDEBUG(D_INODE, "quotactl\n");
1917                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACTL_NET, 0);
1918                 rc = ost_handle_quotactl(req);
1919                 break;
1920         case OST_QUOTA_ADJUST_QUNIT:
1921                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
1922                 rc = ost_handle_quota_adjust_qunit(req);
1923                 break;
1924 #endif
1925         case OBD_PING:
1926                 DEBUG_REQ(D_INODE, req, "ping");
1927                 rc = target_handle_ping(req);
1928                 break;
1929         /* FIXME - just reply status */
1930         case LLOG_ORIGIN_CONNECT:
1931                 DEBUG_REQ(D_INODE, req, "log connect");
1932                 rc = llog_handle_connect(req);
1933                 req->rq_status = rc;
1934                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1935                 if (rc)
1936                         RETURN(rc);
1937                 RETURN(ptlrpc_reply(req));
1938         case OBD_LOG_CANCEL:
1939                 CDEBUG(D_INODE, "log cancel\n");
1940                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1941                 rc = llog_origin_handle_cancel(req);
1942                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1943                 req->rq_status = rc;
1944                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1945                 if (rc)
1946                         RETURN(rc);
1947                 RETURN(ptlrpc_reply(req));
1948         case LDLM_ENQUEUE:
1949                 CDEBUG(D_INODE, "enqueue\n");
1950                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1951                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1952                                          ost_blocking_ast,
1953                                          ldlm_server_glimpse_ast);
1954                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1955                 break;
1956         case LDLM_CONVERT:
1957                 CDEBUG(D_INODE, "convert\n");
1958                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1959                 rc = ldlm_handle_convert(req);
1960                 break;
1961         case LDLM_CANCEL:
1962                 CDEBUG(D_INODE, "cancel\n");
1963                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1964                 rc = ldlm_handle_cancel(req);
1965                 break;
1966         case LDLM_BL_CALLBACK:
1967         case LDLM_CP_CALLBACK:
1968                 CDEBUG(D_INODE, "callback\n");
1969                 CERROR("callbacks should not happen on OST\n");
1970                 /* fall through */
1971         default:
1972                 CERROR("Unexpected opcode %d\n",
1973                        lustre_msg_get_opc(req->rq_reqmsg));
1974                 req->rq_status = -ENOTSUPP;
1975                 rc = ptlrpc_error(req);
1976                 RETURN(rc);
1977         }
1978
1979         LASSERT(current->journal_info == NULL);
1980
1981         EXIT;
1982         /* If we're DISCONNECTing, the export_data is already freed */
1983         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
1984                 target_committed_to_req(req);
1985
1986 out:
1987         if (!rc)
1988                 oti_to_request(oti, req);
1989         return target_handle_reply(req, rc, fail);
1990 }
1991
1992 /*
1993  * free per-thread pool created by ost_thread_init().
1994  */
1995 static void ost_thread_done(struct ptlrpc_thread *thread)
1996 {
1997         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
1998                                              * Storage */
1999
2000         ENTRY;
2001
2002         LASSERT(thread != NULL);
2003
2004         /*
2005          * be prepared to handle partially-initialized pools (because this is
2006          * called from ost_thread_init() for cleanup.
2007          */
2008         tls = thread->t_data;
2009         if (tls != NULL) {
2010                 OBD_FREE_PTR(tls);
2011                 thread->t_data = NULL;
2012         }
2013         EXIT;
2014 }
2015
2016 /*
2017  * initialize per-thread page pool (bug 5137).
2018  */
2019 static int ost_thread_init(struct ptlrpc_thread *thread)
2020 {
2021         struct ost_thread_local_cache *tls;
2022
2023         ENTRY;
2024
2025         LASSERT(thread != NULL);
2026         LASSERT(thread->t_data == NULL);
2027         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2028
2029         OBD_ALLOC_PTR(tls);
2030         if (tls == NULL)
2031                 RETURN(-ENOMEM);
2032         thread->t_data = tls;
2033         RETURN(0);
2034 }
2035
2036 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2037 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
2038 {
2039         struct ost_obd *ost = &obd->u.ost;
2040         struct lprocfs_static_vars lvars;
2041         int oss_min_threads;
2042         int oss_max_threads;
2043         int oss_min_create_threads;
2044         int oss_max_create_threads;
2045         int rc;
2046         ENTRY;
2047
2048         rc = cleanup_group_info();
2049         if (rc)
2050                 RETURN(rc);
2051         lprocfs_ost_init_vars(&lvars);
2052         lprocfs_obd_setup(obd, lvars.obd_vars);
2053
2054         sema_init(&ost->ost_health_sem, 1);
2055
2056         /* Always sync on lock cancel */
2057         ost->ost_sync_on_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
2058
2059         if (oss_num_threads) {
2060                 /* If oss_num_threads is set, it is the min and the max. */
2061                 if (oss_num_threads > OSS_THREADS_MAX)
2062                         oss_num_threads = OSS_THREADS_MAX;
2063                 if (oss_num_threads < OSS_THREADS_MIN)
2064                         oss_num_threads = OSS_THREADS_MIN;
2065                 oss_max_threads = oss_min_threads = oss_num_threads;
2066         } else {
2067                 /* Base min threads on memory and cpus */
2068                 oss_min_threads = num_possible_cpus() * CFS_NUM_CACHEPAGES >>
2069                         (27 - CFS_PAGE_SHIFT);
2070                 if (oss_min_threads < OSS_THREADS_MIN)
2071                         oss_min_threads = OSS_THREADS_MIN;
2072                 /* Insure a 4x range for dynamic threads */
2073                 if (oss_min_threads > OSS_THREADS_MAX / 4)
2074                         oss_min_threads = OSS_THREADS_MAX / 4;
2075                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2076         }
2077
2078         ost->ost_service =
2079                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2080                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
2081                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2082                                 ost_handle, LUSTRE_OSS_NAME,
2083                                 obd->obd_proc_entry, target_print_req,
2084                                 oss_min_threads, oss_max_threads, "ll_ost",
2085                                 NULL);
2086         if (ost->ost_service == NULL) {
2087                 CERROR("failed to start OST service\n");
2088                 GOTO(out_lprocfs, rc = -ENOMEM);
2089         }
2090
2091         rc = ptlrpc_start_threads(obd, ost->ost_service);
2092         if (rc)
2093                 GOTO(out_service, rc = -EINVAL);
2094
2095         if (oss_num_create_threads) {
2096                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2097                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2098                 if (oss_num_create_threads < OSS_DEF_CREATE_THREADS)
2099                         oss_num_create_threads = OSS_DEF_CREATE_THREADS;
2100                 oss_min_create_threads = oss_max_create_threads =
2101                         oss_num_create_threads;
2102         } else {
2103                 oss_min_create_threads = OSS_DEF_CREATE_THREADS;
2104                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2105         }
2106
2107         ost->ost_create_service =
2108                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2109                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2110                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2111                                 ost_handle, "ost_create",
2112                                 obd->obd_proc_entry, target_print_req,
2113                                 oss_min_create_threads,
2114                                 oss_max_create_threads,
2115                                 "ll_ost_creat", NULL);
2116         if (ost->ost_create_service == NULL) {
2117                 CERROR("failed to start OST create service\n");
2118                 GOTO(out_service, rc = -ENOMEM);
2119         }
2120
2121         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
2122         if (rc)
2123                 GOTO(out_create, rc = -EINVAL);
2124
2125         ost->ost_io_service =
2126                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2127                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2128                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2129                                 ost_handle, "ost_io",
2130                                 obd->obd_proc_entry, target_print_req,
2131                                 oss_min_threads, oss_max_threads, "ll_ost_io",
2132                                 ost_hpreq_handler);
2133         if (ost->ost_io_service == NULL) {
2134                 CERROR("failed to start OST I/O service\n");
2135                 GOTO(out_create, rc = -ENOMEM);
2136         }
2137
2138         ost->ost_io_service->srv_init = ost_thread_init;
2139         ost->ost_io_service->srv_done = ost_thread_done;
2140         ost->ost_io_service->srv_cpu_affinity = 1;
2141         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
2142         if (rc)
2143                 GOTO(out_io, rc = -EINVAL);
2144
2145         ping_evictor_start();
2146
2147         RETURN(0);
2148
2149 out_io:
2150         ptlrpc_unregister_service(ost->ost_io_service);
2151         ost->ost_io_service = NULL;
2152 out_create:
2153         ptlrpc_unregister_service(ost->ost_create_service);
2154         ost->ost_create_service = NULL;
2155 out_service:
2156         ptlrpc_unregister_service(ost->ost_service);
2157         ost->ost_service = NULL;
2158 out_lprocfs:
2159         lprocfs_obd_cleanup(obd);
2160         RETURN(rc);
2161 }
2162
2163 static int ost_cleanup(struct obd_device *obd)
2164 {
2165         struct ost_obd *ost = &obd->u.ost;
2166         int err = 0;
2167         ENTRY;
2168
2169         ping_evictor_stop();
2170
2171         spin_lock_bh(&obd->obd_processing_task_lock);
2172         if (obd->obd_recovering) {
2173                 target_cancel_recovery_timer(obd);
2174                 obd->obd_recovering = 0;
2175         }
2176         spin_unlock_bh(&obd->obd_processing_task_lock);
2177
2178         down(&ost->ost_health_sem);
2179         ptlrpc_unregister_service(ost->ost_service);
2180         ptlrpc_unregister_service(ost->ost_create_service);
2181         ptlrpc_unregister_service(ost->ost_io_service);
2182         ost->ost_service = NULL;
2183         ost->ost_create_service = NULL;
2184         up(&ost->ost_health_sem);
2185
2186         lprocfs_obd_cleanup(obd);
2187
2188         RETURN(err);
2189 }
2190
2191 static int ost_health_check(struct obd_device *obd)
2192 {
2193         struct ost_obd *ost = &obd->u.ost;
2194         int rc = 0;
2195
2196         down(&ost->ost_health_sem);
2197         rc |= ptlrpc_service_health_check(ost->ost_service);
2198         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2199         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2200         up(&ost->ost_health_sem);
2201
2202         /*
2203          * health_check to return 0 on healthy
2204          * and 1 on unhealthy.
2205          */
2206         if( rc != 0)
2207                 rc = 1;
2208
2209         return rc;
2210 }
2211
2212 /* use obd ops to offer management infrastructure */
2213 static struct obd_ops ost_obd_ops = {
2214         .o_owner        = THIS_MODULE,
2215         .o_setup        = ost_setup,
2216         .o_cleanup      = ost_cleanup,
2217         .o_health_check = ost_health_check,
2218 };
2219
2220
2221 static int __init ost_init(void)
2222 {
2223         struct lprocfs_static_vars lvars;
2224         int rc;
2225         ENTRY;
2226
2227         lprocfs_ost_init_vars(&lvars);
2228         rc = class_register_type(&ost_obd_ops, lvars.module_vars,
2229                                  LUSTRE_OSS_NAME);
2230
2231         if (ost_num_threads != 0 && oss_num_threads == 0) {
2232                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2233                               "use oss_num_threads instead or unset both for "
2234                               "dynamic thread startup\n");
2235                 oss_num_threads = ost_num_threads;
2236         }
2237
2238         RETURN(rc);
2239 }
2240
2241 static void /*__exit*/ ost_exit(void)
2242 {
2243         class_unregister_type(LUSTRE_OSS_NAME);
2244 }
2245
2246 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2247 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2248 MODULE_LICENSE("GPL");
2249
2250 module_init(ost_init);
2251 module_exit(ost_exit);