Whamcloud - gitweb
b=23793 MOUNTOPT "-o" cleanup
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45 #define DEBUG_SUBSYSTEM S_OST
46
47 #include <linux/module.h>
48 #include <obd_ost.h>
49 #include <lustre_net.h>
50 #include <lustre_dlm.h>
51 #include <lustre_export.h>
52 #include <lustre_debug.h>
53 #include <linux/init.h>
54 #include <lprocfs_status.h>
55 #include <libcfs/list.h>
56 #include <lustre_quota.h>
57 #include <lustre_log.h>
58 #include "ost_internal.h"
59
60 static int oss_num_threads;
61 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
62                 "number of OSS service threads to start");
63
64 static int ost_num_threads;
65 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
66                 "number of OST service threads to start (deprecated)");
67
68 static int oss_num_create_threads;
69 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
70                 "number of OSS create threads to start");
71
72 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
73 {
74         struct oti_req_ack_lock *ack_lock;
75         int i;
76
77         if (oti == NULL)
78                 return;
79
80         if (req->rq_repmsg) {
81                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
82                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
83                 versions[0] = oti->oti_pre_version;
84                 lustre_msg_set_versions(req->rq_repmsg, versions);
85         }
86         req->rq_transno = oti->oti_transno;
87
88         /* XXX 4 == entries in oti_ack_locks??? */
89         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
90                 if (!ack_lock->mode)
91                         break;
92                 /* XXX not even calling target_send_reply in some cases... */
93                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
94         }
95 }
96
97 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
98                        struct obd_trans_info *oti)
99 {
100         struct ost_body *body, *repbody;
101         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
102         int rc;
103         ENTRY;
104
105         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
106                                   lustre_swab_ost_body);
107         if (body == NULL)
108                 RETURN(-EFAULT);
109
110         if (body->oa.o_id == 0)
111                 RETURN(-EPROTO);
112
113         if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
114                 struct ldlm_request *dlm;
115                 dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
116                                          lustre_swab_ldlm_request);
117                 if (dlm == NULL)
118                         RETURN (-EFAULT);
119                 ldlm_request_cancel(req, dlm, 0);
120         }
121
122         rc = lustre_pack_reply(req, 2, size, NULL);
123         if (rc)
124                 RETURN(rc);
125
126         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
127                 oti->oti_logcookies = &body->oa.o_lcookie;
128         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
129                                  sizeof(*repbody));
130         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
131         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
132         RETURN(0);
133 }
134
135 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
136 {
137         struct ost_body *body, *repbody;
138         struct obd_info *oinfo;
139         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
140         int rc;
141         ENTRY;
142
143         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
144                                   lustre_swab_ost_body);
145         if (body == NULL)
146                 RETURN(-EFAULT);
147
148         rc = lustre_pack_reply(req, 2, size, NULL);
149         if (rc)
150                 RETURN(rc);
151
152         OBD_ALLOC_PTR(oinfo);
153         if (NULL == oinfo)
154                 RETURN(-ENOMEM);
155
156         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
157                                  sizeof(*repbody));
158         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159
160         oinfo->oi_oa = &repbody->oa;
161         req->rq_status = obd_getattr(exp, oinfo);
162
163         OBD_FREE_PTR(oinfo);
164         RETURN(0);
165 }
166
167 static int ost_statfs(struct ptlrpc_request *req)
168 {
169         struct obd_statfs *osfs;
170         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
171         int rc;
172         ENTRY;
173
174         rc = lustre_pack_reply(req, 2, size, NULL);
175         if (rc)
176                 RETURN(rc);
177
178         osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
179
180         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
181                                     cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
182                                     0);
183         if (req->rq_status != 0)
184                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
185
186         RETURN(0);
187 }
188
189 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
190                       struct obd_trans_info *oti)
191 {
192         struct ost_body *body, *repbody;
193         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
194         int rc;
195         ENTRY;
196
197         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
198                                   lustre_swab_ost_body);
199         if (body == NULL)
200                 RETURN(-EFAULT);
201
202         rc = lustre_pack_reply(req, 2, size, NULL);
203         if (rc)
204                 RETURN(rc);
205
206         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
207                                  sizeof(*repbody));
208         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
209         oti->oti_logcookies = &repbody->oa.o_lcookie;
210
211         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
212         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
213         RETURN(0);
214 }
215
216 /*
217  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
218  * lock on the file being truncated.
219  */
220 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
221                               struct lustre_handle *lh)
222 {
223         int flags;
224         struct ldlm_res_id res_id = { .name = { oa->o_id } };
225         ldlm_policy_data_t policy;
226         __u64 start;
227         __u64 finis;
228
229         ENTRY;
230
231         LASSERT(!lustre_handle_is_used(lh));
232
233         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
234             !(oa->o_flags & OBD_FL_TRUNCLOCK))
235                 RETURN(0);
236
237         CDEBUG(D_INODE, "OST-side truncate lock.\n");
238
239         start = oa->o_size;
240         finis = start + oa->o_blocks;
241
242         /*
243          * standard truncate optimization: if file body is completely
244          * destroyed, don't send data back to the server.
245          */
246         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
247
248         policy.l_extent.start = start & CFS_PAGE_MASK;
249
250         /*
251          * If ->o_blocks is EOF it means "lock till the end of the
252          * file". Otherwise, it's size of a hole being punched (in bytes)
253          */
254         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
255                 policy.l_extent.end = OBD_OBJECT_EOF;
256         else
257                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
258
259         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
260                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
261                                       ldlm_blocking_ast, ldlm_completion_ast,
262                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
263 }
264
265 /*
266  * Helper function for ost_punch(): release lock acquired by
267  * ost_punch_lock_get(), if any.
268  */
269 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
270                                struct lustre_handle *lh)
271 {
272         ENTRY;
273         if (lustre_handle_is_used(lh))
274                 ldlm_lock_decref(lh, LCK_PW);
275         EXIT;
276 }
277
278 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
279                      struct obd_trans_info *oti)
280 {
281         struct obd_info *oinfo;
282         struct ost_body *body, *repbody;
283         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
284         int rc;
285         struct lustre_handle lh = {0,};
286         ENTRY;
287
288         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
289         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
290
291         /* ost_body is varified and swabbed in ost_hpreq_handler() */
292         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
293         LASSERT(body != NULL);
294
295         OBD_ALLOC_PTR(oinfo);
296         if (NULL == oinfo)
297                 RETURN(-ENOMEM);
298
299         oinfo->oi_oa = &body->oa;
300         oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
301         oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
302
303         if ((oinfo->oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
304             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
305                 GOTO(out, rc = -EINVAL);
306
307         rc = lustre_pack_reply(req, 2, size, NULL);
308         if (rc)
309                 GOTO(out, rc);
310
311         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
312                                  sizeof(*repbody));
313         rc = ost_punch_lock_get(exp, oinfo->oi_oa, &lh);
314         if (rc == 0) {
315                 if (oinfo->oi_oa->o_valid & OBD_MD_FLFLAGS &&
316                     oinfo->oi_oa->o_flags == OBD_FL_TRUNCLOCK)
317                         /*
318                          * If OBD_FL_TRUNCLOCK is the only bit set in
319                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
320                          * through filter_setattr() to filter_iocontrol().
321                          */
322                         oinfo->oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
323
324                 req->rq_status = obd_punch(exp, oinfo, oti, NULL);
325                 ost_punch_lock_put(exp, oinfo->oi_oa, &lh);
326         }
327
328         repbody->oa = *oinfo->oi_oa;
329 out:
330         OBD_FREE_PTR(oinfo);
331         RETURN(rc);
332 }
333
334 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
335 {
336         struct obd_info *oinfo;
337         struct ost_body *body, *repbody;
338         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
339         int rc;
340         ENTRY;
341
342         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
343                                   lustre_swab_ost_body);
344         if (body == NULL)
345                 RETURN(-EFAULT);
346
347         rc = lustre_pack_reply(req, 2, size, NULL);
348         if (rc)
349                 RETURN(rc);
350
351         OBD_ALLOC_PTR(oinfo);
352         if (NULL == oinfo)
353                 RETURN(-ENOMEM);
354
355         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
356                                  sizeof(*repbody));
357
358         oinfo->oi_oa = &body->oa;
359         req->rq_status = obd_sync(exp, oinfo, repbody->oa.o_size,
360                                   repbody->oa.o_blocks, NULL);
361         repbody->oa = *oinfo->oi_oa;
362
363         OBD_FREE_PTR(oinfo);
364         RETURN(0);
365 }
366
367 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
368                        struct obd_trans_info *oti)
369 {
370         struct ost_body *body, *repbody;
371         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
372         int rc;
373         struct obd_info *oinfo = NULL;
374         ENTRY;
375
376         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
377                                   lustre_swab_ost_body);
378         if (body == NULL)
379                 RETURN(-EFAULT);
380
381         rc = lustre_pack_reply(req, 2, size, NULL);
382         if (rc)
383                 RETURN(rc);
384
385         OBD_ALLOC_PTR(oinfo);
386         if (NULL == oinfo)
387                 RETURN(-ENOMEM);
388
389         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
390                                  sizeof(*repbody));
391         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
392
393         oinfo->oi_oa = &repbody->oa;
394         req->rq_status = obd_setattr(exp, oinfo, oti);
395
396         OBD_FREE_PTR(oinfo);
397         RETURN(0);
398 }
399
400 static int ost_bulk_timeout(void *data)
401 {
402         ENTRY;
403         /* We don't fail the connection here, because having the export
404          * killed makes the (vital) call to commitrw very sad.
405          */
406         RETURN(1);
407 }
408
409 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
410                                cksum_type_t cksum_type)
411 {
412         __u32 cksum;
413         int i;
414
415         cksum = init_checksum(cksum_type);
416         for (i = 0; i < desc->bd_iov_count; i++) {
417                 struct page *page = desc->bd_iov[i].kiov_page;
418                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
419                 char *ptr = kmap(page) + off;
420                 int len = desc->bd_iov[i].kiov_len;
421
422                 /* corrupt the data before we compute the checksum, to
423                  * simulate a client->OST data error */
424                 if (i == 0 && opc == OST_WRITE &&
425                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE))
426                         memcpy(ptr, "bad3", min(4, len));
427                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
428                 /* corrupt the data after we compute the checksum, to
429                  * simulate an OST->client data error */
430                 if (i == 0 && opc == OST_READ &&
431                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND)) {
432                         memcpy(ptr, "bad4", min(4, len));
433                         /* nobody should use corrupted page again */
434                         ClearPageUptodate(page);
435                 }
436                 kunmap(page);
437         }
438
439         return cksum;
440 }
441
442 static int ost_brw_lock_get(int mode, struct obd_export *exp,
443                             struct obd_ioobj *obj, struct niobuf_remote *nb,
444                             struct lustre_handle *lh)
445 {
446         int flags                 = 0;
447         int nrbufs                = obj->ioo_bufcnt;
448         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
449         ldlm_policy_data_t policy;
450         int i;
451
452         ENTRY;
453
454         LASSERT(mode == LCK_PR || mode == LCK_PW);
455         LASSERT(!lustre_handle_is_used(lh));
456
457         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
458                 RETURN(0);
459
460         /* EXPENSIVE ASSERTION */
461         for (i = 1; i < nrbufs; i ++)
462                 LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
463                         (nb[i].flags & OBD_BRW_SRVLOCK));
464
465         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
466         policy.l_extent.end   = (nb[nrbufs - 1].offset +
467                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
468
469         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
470                                       LDLM_EXTENT, &policy, mode, &flags,
471                                       ldlm_blocking_ast, ldlm_completion_ast,
472                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
473 }
474
475 static void ost_brw_lock_put(int mode,
476                              struct obd_ioobj *obj, struct niobuf_remote *niob,
477                              struct lustre_handle *lh)
478 {
479         ENTRY;
480         LASSERT(mode == LCK_PR || mode == LCK_PW);
481         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
482                 lustre_handle_is_used(lh));
483         if (lustre_handle_is_used(lh)) {
484                 struct ldlm_lock *lock = ldlm_handle2lock(lh);
485                 ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
486                 LDLM_LOCK_PUT(lock);
487                 ldlm_lock_decref(lh, mode);
488         }
489         EXIT;
490 }
491
492 struct ost_prolong_data {
493         struct obd_export *opd_exp;
494         ldlm_policy_data_t opd_policy;
495         struct obdo *opd_oa;
496         ldlm_mode_t opd_mode;
497         int opd_lock_match;
498         int opd_timeout;
499 };
500
501 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
502 {
503         struct ost_prolong_data *opd = data;
504
505         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
506
507         if (lock->l_req_mode != lock->l_granted_mode) {
508                 /* scan granted locks only */
509                 return LDLM_ITER_STOP;
510         }
511
512         if (lock->l_export != opd->opd_exp) {
513                 /* prolong locks only for given client */
514                 return LDLM_ITER_CONTINUE;
515         }
516
517         if (!(lock->l_granted_mode & opd->opd_mode)) {
518                 /* we aren't interesting in all type of locks */
519                 return LDLM_ITER_CONTINUE;
520         }
521
522         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
523             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
524                 /* the request doesn't cross the lock, skip it */
525                 return LDLM_ITER_CONTINUE;
526         }
527
528         /* Fill the obdo with the matched lock handle.
529          * XXX: it is possible in some cases the IO RPC is covered by several
530          * locks, even for the write case, so it may need to be a lock list. */
531         if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
532                 opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
533                 opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
534         }
535
536         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
537                 /* ignore locks not being cancelled */
538                 return LDLM_ITER_CONTINUE;
539         }
540
541         CDEBUG(D_DLMTRACE,"refresh lock: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
542                lock->l_resource->lr_name.name[0],
543                lock->l_resource->lr_name.name[1],
544                opd->opd_policy.l_extent.start, opd->opd_policy.l_extent.end);
545         /* OK. this is a possible lock the user holds doing I/O
546          * let's refresh eviction timer for it */
547         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
548         opd->opd_lock_match = 1;
549
550         return LDLM_ITER_CONTINUE;
551 }
552
553 static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
554                                 struct niobuf_remote *nb, struct obdo *oa,
555                                 ldlm_mode_t mode)
556
557
558 {
559         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
560         struct ost_prolong_data opd = { 0 };
561         int nrbufs = obj->ioo_bufcnt;
562
563         ENTRY;
564
565         opd.opd_mode = mode;
566         opd.opd_exp = req->rq_export;
567         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
568         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
569                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
570
571         /* prolong locks for the current service time of the corresponding
572          * portal (= OST_IO_PORTAL) */
573         opd.opd_timeout = AT_OFF ? obd_timeout / 2 :
574                           max(at_est2timeout(at_get(&req->rq_rqbd->
575                               rqbd_service->srv_at_estimate)), ldlm_timeout);
576
577         CDEBUG(D_INFO,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
578                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
579                opd.opd_policy.l_extent.end);
580
581         if (oa->o_valid & OBD_MD_FLHANDLE) {
582                 struct ldlm_lock *lock;
583
584                 lock = ldlm_handle2lock(&oa->o_handle);
585                 if (lock != NULL) {
586                         ost_prolong_locks_iter(lock, &opd);
587                         if (opd.opd_lock_match) {
588                                 LDLM_LOCK_PUT(lock);
589                                 RETURN(1);
590                         }
591
592                         /* Check if the lock covers the whole IO region,
593                          * otherwise iterate through the resource. */
594                         if (lock->l_policy_data.l_extent.end >=
595                             opd.opd_policy.l_extent.end &&
596                             lock->l_policy_data.l_extent.start <=
597                             opd.opd_policy.l_extent.start) {
598                                 LDLM_LOCK_PUT(lock);
599                                 RETURN(0);
600                         }
601                         LDLM_LOCK_PUT(lock);
602                 }
603         }
604
605         opd.opd_oa = oa;
606         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
607                               ost_prolong_locks_iter, &opd);
608         RETURN(opd.opd_lock_match);
609 }
610
611 /* Allocate thread local buffers if needed */
612 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
613 {
614         struct ost_thread_local_cache *tls =
615                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
616
617         /* In normal mode of operation an I/O request is serviced only
618          * by ll_ost_io threads each of them has own tls buffers allocated by
619          * ost_thread_init().
620          * During recovery, an I/O request may be queued until any of the ost
621          * service threads process it. Not necessary it should be one of
622          * ll_ost_io threads. In that case we dynamically allocating tls
623          * buffers for the request service time. */
624         if (unlikely(tls == NULL)) {
625                 LASSERT(r->rq_export->exp_in_recovery);
626                 OBD_ALLOC_PTR(tls);
627                 if (tls != NULL) {
628                         tls->temporary = 1;
629                         r->rq_svc_thread->t_data = tls;
630                 }
631         }
632         return  tls;
633 }
634
635 /* Free thread local buffers if they were allocated only for servicing
636  * this one request */
637 static void ost_tls_put(struct ptlrpc_request *r)
638 {
639         struct ost_thread_local_cache *tls =
640                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
641
642         if (unlikely(tls->temporary)) {
643                 OBD_FREE_PTR(tls);
644                 r->rq_svc_thread->t_data = NULL;
645         }
646 }
647
648 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
649 {
650         struct ptlrpc_bulk_desc *desc = NULL;
651         struct obd_export       *exp = req->rq_export;
652         struct niobuf_remote *remote_nb;
653         struct niobuf_local *local_nb;
654         struct obd_ioobj *ioo;
655         struct ost_body *body, *repbody;
656         struct l_wait_info lwi;
657         struct lustre_handle lockh = { 0 };
658         __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
659         int niocount, npages, nob = 0, rc, i;
660         int no_reply = 0;
661         struct ost_thread_local_cache *tls;
662         ENTRY;
663
664         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
665                 GOTO(out, rc = -EIO);
666
667         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
668
669         /* Check if there is eviction in progress, and if so, wait for it to
670          * finish */
671         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
672                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
673                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
674                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
675                         &lwi);
676         }
677         if (exp->exp_failed)
678                 GOTO(out, rc = -ENOTCONN);
679
680         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
681          * ost_rw_hpreq_check(). */
682         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
683         LASSERT(body != NULL);
684
685         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
686         LASSERT(ioo != NULL);
687
688         niocount = ioo->ioo_bufcnt;
689         remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
690                                    niocount * sizeof(*remote_nb));
691         LASSERT(remote_nb != NULL);
692
693         rc = lustre_pack_reply(req, 2, size, NULL);
694         if (rc)
695                 GOTO(out, rc);
696
697         tls = ost_tls_get(req);
698         if (tls == NULL)
699                 GOTO(out_bulk, rc = -ENOMEM);
700         local_nb = tls->local;
701
702         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
703         if (rc != 0)
704                 GOTO(out_tls, rc);
705
706         /*
707          * If getting the lock took more time than
708          * client was willing to wait, drop it. b=11330
709          */
710         if (cfs_time_current_sec() > req->rq_deadline ||
711             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
712                 no_reply = 1;
713                 CERROR("Dropping timed-out read from %s because locking"
714                        "object "LPX64" took %ld seconds (limit was %ld).\n",
715                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
716                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
717                        req->rq_deadline - req->rq_arrival_time.tv_sec);
718                 GOTO(out_lock, rc = -ETIMEDOUT);
719         }
720
721         npages = OST_THREAD_POOL_SIZE;
722         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
723                         remote_nb, &npages, local_nb, oti);
724         if (rc != 0)
725                 GOTO(out_lock, rc);
726
727         desc = ptlrpc_prep_bulk_exp(req, npages,
728                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
729         if (desc == NULL)
730                 GOTO(out_commitrw, rc = -ENOMEM);
731
732         if (!lustre_handle_is_used(&lockh))
733                 /* no needs to try to prolong lock if server is asked
734                  * to handle locking (= OBD_BRW_SRVLOCK) */
735                 ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa,
736                                      LCK_PW | LCK_PR);
737
738         nob = 0;
739         for (i = 0; i < npages; i++) {
740                 int page_rc = local_nb[i].rc;
741
742                 if (page_rc < 0) {              /* error */
743                         rc = page_rc;
744                         break;
745                 }
746
747                 nob += page_rc;
748                 if (page_rc != 0) {             /* some data! */
749                         LASSERT (local_nb[i].page != NULL);
750                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
751                                               local_nb[i].offset & ~CFS_PAGE_MASK,
752                                               page_rc);
753                 }
754
755                 if (page_rc != local_nb[i].len) { /* short read */
756                         /* All subsequent pages should be 0 */
757                         while(++i < npages)
758                                 LASSERT(local_nb[i].rc == 0);
759                         break;
760                 }
761         }
762
763         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
764                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
765
766                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
767                         cksum_type = cksum_type_unpack(body->oa.o_flags);
768                 body->oa.o_flags = cksum_type_pack(cksum_type);
769                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
770                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
771                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
772         } else {
773                 body->oa.o_valid = 0;
774         }
775         /* We're finishing using body->oa as an input variable */
776
777         /* Check if client was evicted while we were doing i/o before touching
778            network */
779         if (rc == 0) {
780                 /* Check if there is eviction in progress, and if so, wait for
781                  * it to finish */
782                 if (unlikely(atomic_read(&exp->exp_obd->
783                                                 obd_evict_inprogress))) {
784                         lwi = LWI_INTR(NULL, NULL);
785                         rc = l_wait_event(exp->exp_obd->
786                                                 obd_evict_inprogress_waitq,
787                                           !atomic_read(&exp->exp_obd->
788                                                         obd_evict_inprogress),
789                                           &lwi);
790                 }
791                 /* Check if client was evicted or tried to reconnect already */
792                 if (exp->exp_failed || exp->exp_abort_active_req)
793                         rc = -ENOTCONN;
794                 else
795                         rc = ptlrpc_start_bulk_transfer(desc);
796                 if (rc == 0) {
797                         time_t start = cfs_time_current_sec();
798                         do {
799                                 long timeoutl = req->rq_deadline -
800                                         cfs_time_current_sec();
801                                 cfs_duration_t timeout = timeoutl <= 0 ?
802                                         CFS_TICK : cfs_time_seconds(timeoutl);
803                                 lwi = LWI_TIMEOUT_INTERVAL(timeout,
804                                                            cfs_time_seconds(1),
805                                                            ost_bulk_timeout,
806                                                            desc);
807                                 rc = l_wait_event(desc->bd_waitq,
808                                                   !ptlrpc_server_bulk_active(desc) ||
809                                                   exp->exp_failed ||
810                                                   exp->exp_abort_active_req,
811                                                   &lwi);
812                                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
813                                 /* Wait again if we changed deadline */
814                         } while ((rc == -ETIMEDOUT) &&
815                                  (req->rq_deadline > cfs_time_current_sec()));
816
817                         if (rc == -ETIMEDOUT) {
818                                 DEBUG_REQ(D_ERROR, req,
819                                           "timeout on bulk PUT after %ld%+lds",
820                                           req->rq_deadline - start,
821                                           cfs_time_current_sec() -
822                                           req->rq_deadline);
823                                 ptlrpc_abort_bulk(desc);
824                         } else if (exp->exp_failed) {
825                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
826                                 rc = -ENOTCONN;
827                                 ptlrpc_abort_bulk(desc);
828                         } else if (exp->exp_abort_active_req) {
829                                 DEBUG_REQ(D_ERROR, req, "Reconnect on bulk PUT");
830                                 /* we don't reply anyway */
831                                 rc = -ETIMEDOUT;
832                                 ptlrpc_abort_bulk(desc);
833                         } else if (!desc->bd_success ||
834                                    desc->bd_nob_transferred != desc->bd_nob) {
835                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
836                                           desc->bd_success ?
837                                           "truncated" : "network error on",
838                                           desc->bd_nob_transferred,
839                                           desc->bd_nob);
840                                 /* XXX should this be a different errno? */
841                                 rc = -ETIMEDOUT;
842                         }
843                 } else {
844                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
845                 }
846                 no_reply = rc != 0;
847         }
848
849  out_commitrw:
850         /* Must commit after prep above in all cases */
851         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo,
852                           remote_nb, npages, local_nb, oti, rc);
853
854         if (rc == 0) {
855                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
856                                          sizeof(*repbody));
857                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
858         }
859
860  out_lock:
861         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
862  out_tls:
863         ost_tls_put(req);
864  out_bulk:
865         if (desc)
866                 ptlrpc_free_bulk(desc);
867  out:
868         LASSERT(rc <= 0);
869         if (rc == 0) {
870                 req->rq_status = nob;
871                 ptlrpc_lprocfs_brw(req, nob);
872                 target_committed_to_req(req);
873                 ptlrpc_reply(req);
874         } else if (!no_reply) {
875                 /* Only reply if there was no comms problem with bulk */
876                 target_committed_to_req(req);
877                 req->rq_status = rc;
878                 ptlrpc_error(req);
879         } else {
880                 /* reply out callback would free */
881                 ptlrpc_req_drop_rs(req);
882                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
883                       "client will retry\n",
884                       exp->exp_obd->obd_name,
885                       exp->exp_client_uuid.uuid,
886                       exp->exp_connection->c_remote_uuid.uuid,
887                       libcfs_id2str(req->rq_peer));
888         }
889
890         RETURN(rc);
891 }
892
893 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
894 {
895         struct ptlrpc_bulk_desc *desc = NULL;
896         struct obd_export       *exp = req->rq_export;
897         struct niobuf_remote    *remote_nb;
898         struct niobuf_local     *local_nb;
899         struct obd_ioobj        *ioo;
900         struct ost_body         *body, *repbody;
901         struct l_wait_info       lwi;
902         struct lustre_handle     lockh = {0};
903         __u32                   *rcs;
904         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
905         int objcount, niocount, npages;
906         int rc, i, j;
907         obd_count                client_cksum = 0, server_cksum = 0;
908         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
909         int                      no_reply = 0, mmap = 0;
910         struct ost_thread_local_cache *tls;
911         ENTRY;
912
913         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
914                 GOTO(out, rc = -EIO);
915         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
916                 GOTO(out, rc = -EFAULT);
917
918         /* pause before transaction has been started */
919         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
920
921         /* Check if there is eviction in progress, and if so, wait for it to
922          * finish */
923         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
924                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
925                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
926                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
927                         &lwi);
928         }
929         if (exp->exp_failed)
930                 GOTO(out, rc = -ENOTCONN);
931
932         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
933          * ost_rw_hpreq_check(). */
934         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
935         LASSERT(body != NULL);
936
937         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
938                    sizeof(*ioo);
939         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
940                              objcount * sizeof(*ioo));
941         LASSERT(ioo != NULL);
942         for (niocount = i = 0; i < objcount; i++)
943                 niocount += ioo[i].ioo_bufcnt;
944
945         remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
946                                    niocount * sizeof(*remote_nb));
947         LASSERT(remote_nb != NULL);
948
949         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
950             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
951                 libcfs_memory_pressure_set();
952
953         size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
954         rc = lustre_pack_reply(req, 3, size, NULL);
955         if (rc != 0)
956                 GOTO(out, rc);
957
958         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
959         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
960                              niocount * sizeof(*rcs));
961
962         tls = ost_tls_get(req);
963         if (tls == NULL)
964                 GOTO(out_bulk, rc = -ENOMEM);
965         local_nb = tls->local;
966
967         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
968         if (rc != 0)
969                 GOTO(out_tls, rc);
970
971         /*
972          * If getting the lock took more time than
973          * client was willing to wait, drop it. b=11330
974          */
975         if (cfs_time_current_sec() > req->rq_deadline ||
976             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
977                 no_reply = 1;
978                 CERROR("Dropping timed-out write from %s because locking "
979                        "object "LPX64" took %ld seconds (limit was %ld).\n",
980                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
981                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
982                        req->rq_deadline - req->rq_arrival_time.tv_sec);
983                 GOTO(out_lock, rc = -ETIMEDOUT);
984         }
985
986         if (!lustre_handle_is_used(&lockh))
987                 /* no needs to try to prolong lock if server is asked
988                  * to handle locking (= OBD_BRW_SRVLOCK) */
989                 ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
990
991         /* obd_preprw clobbers oa->valid, so save what we need */
992         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
993                 client_cksum = body->oa.o_cksum;
994                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
995                         cksum_type = cksum_type_unpack(body->oa.o_flags);
996         }
997         if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
998                 mmap = 1;
999
1000         /* Because we already sync grant info with client when reconnect,
1001          * grant info will be cleared for resent req, then fed_grant and
1002          * total_grant will not be modified in following preprw_write*/
1003         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1004                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1005                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1006         }
1007
1008         npages = OST_THREAD_POOL_SIZE;
1009         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
1010                         ioo, remote_nb, &npages, local_nb, oti);
1011         if (rc != 0)
1012                 GOTO(out_lock, rc);
1013
1014         desc = ptlrpc_prep_bulk_exp(req, npages,
1015                                      BULK_GET_SINK, OST_BULK_PORTAL);
1016         if (desc == NULL)
1017                 GOTO(skip_transfer, rc = -ENOMEM);
1018
1019         /* NB Having prepped, we must commit... */
1020
1021         for (i = 0; i < npages; i++)
1022                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1023                                       local_nb[i].offset & ~CFS_PAGE_MASK,
1024                                       local_nb[i].len);
1025
1026         /* Check if client was evicted or tried to reconnect while we
1027          * were doing i/o before touching network */
1028         if (desc->bd_export->exp_failed ||
1029             desc->bd_export->exp_abort_active_req)
1030                 rc = -ENOTCONN;
1031         else
1032                 rc = ptlrpc_start_bulk_transfer(desc);
1033         if (rc == 0) {
1034                 time_t start = cfs_time_current_sec();
1035                 do {
1036                         long timeoutl = req->rq_deadline -
1037                                 cfs_time_current_sec();
1038                         cfs_duration_t timeout = timeoutl <= 0 ?
1039                                 CFS_TICK : cfs_time_seconds(timeoutl);
1040                         lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
1041                                                    ost_bulk_timeout, desc);
1042                         rc = l_wait_event(desc->bd_waitq,
1043                                           !ptlrpc_server_bulk_active(desc) ||
1044                                           desc->bd_export->exp_failed ||
1045                                           desc->bd_export->exp_abort_active_req,
1046                                           &lwi);
1047                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1048                         /* Wait again if we changed deadline */
1049                 } while ((rc == -ETIMEDOUT) &&
1050                          (req->rq_deadline > cfs_time_current_sec()));
1051
1052                 if (rc == -ETIMEDOUT) {
1053                         DEBUG_REQ(D_ERROR, req,
1054                                   "timeout on bulk GET after %ld%+lds",
1055                                   req->rq_deadline - start,
1056                                   cfs_time_current_sec() -
1057                                   req->rq_deadline);
1058                         ptlrpc_abort_bulk(desc);
1059                 } else if (desc->bd_export->exp_failed) {
1060                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1061                         rc = -ENOTCONN;
1062                         ptlrpc_abort_bulk(desc);
1063                 } else if (desc->bd_export->exp_abort_active_req) {
1064                         DEBUG_REQ(D_ERROR, req, "Reconnect on bulk GET");
1065                         /* we don't reply anyway */
1066                         rc = -ETIMEDOUT;
1067                         ptlrpc_abort_bulk(desc);
1068                 } else if (!desc->bd_success ||
1069                            desc->bd_nob_transferred != desc->bd_nob) {
1070                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1071                                   desc->bd_success ?
1072                                   "truncated" : "network error on",
1073                                   desc->bd_nob_transferred, desc->bd_nob);
1074                         /* XXX should this be a different errno? */
1075                         rc = -ETIMEDOUT;
1076                 }
1077         } else {
1078                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
1079         }
1080         no_reply = rc != 0;
1081
1082 skip_transfer:
1083         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1084                                  sizeof(*repbody));
1085         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1086
1087         if (client_cksum != 0 && rc == 0) {
1088                 static int cksum_counter;
1089
1090                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1091                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1092                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1093                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1094                 repbody->oa.o_cksum = server_cksum;
1095                 cksum_counter++;
1096                 if (unlikely(client_cksum != server_cksum)) {
1097                         CDEBUG_LIMIT(mmap ? D_INFO : D_ERROR,
1098                                      "client csum %x, server csum %x\n",
1099                                      client_cksum, server_cksum);
1100                         cksum_counter = 0;
1101                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1102                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1103                                cksum_counter, libcfs_id2str(req->rq_peer),
1104                                server_cksum);
1105                 }
1106         }
1107
1108         /* Check if there is eviction in progress, and if so, wait for
1109          * it to finish */
1110         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
1111                 lwi = LWI_INTR(NULL, NULL);
1112                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
1113                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
1114                         &lwi);
1115         }
1116         if (rc == 0 && exp->exp_failed)
1117                 rc = -ENOTCONN;
1118
1119         /* Must commit after prep above in all cases */
1120         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo,
1121                           remote_nb, npages, local_nb, oti, rc);
1122
1123         if (rc == -ENOTCONN)
1124                 /* quota acquire process has been given up because
1125                  * either the client has been evicted or the client
1126                  * has timed out the request already */
1127                 no_reply = 1;
1128
1129         /*
1130          * Disable sending mtime back to the client. If the client locked the
1131          * whole object, then it has already updated the mtime on its side,
1132          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1133          */
1134         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1135
1136         if (unlikely(client_cksum != server_cksum && rc == 0 && !mmap)) {
1137                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1138                 char *msg;
1139                 char *via;
1140                 char *router;
1141
1142                 if (new_cksum == server_cksum)
1143                         msg = "changed in transit before arrival at OST";
1144                 else if (new_cksum == client_cksum)
1145                         msg = "initial checksum before message complete";
1146                 else
1147                         msg = "changed in transit AND after initial checksum";
1148
1149                 if (req->rq_peer.nid == desc->bd_sender) {
1150                         via = router = "";
1151                 } else {
1152                         via = " via ";
1153                         router = libcfs_nid2str(desc->bd_sender);
1154                 }
1155
1156                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from %s"
1157                                    "%s%s inum "LPU64"/"LPU64" object "LPU64"/"
1158                                    LPU64" extent ["LPU64"-"LPU64"]\n",
1159                                    exp->exp_obd->obd_name, msg,
1160                                    libcfs_id2str(req->rq_peer),
1161                                    via, router,
1162                                    body->oa.o_valid & OBD_MD_FLFID ?
1163                                                 body->oa.o_fid : (__u64)0,
1164                                    body->oa.o_valid & OBD_MD_FLFID ?
1165                                                 body->oa.o_generation :(__u64)0,
1166                                    body->oa.o_id,
1167                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1168                                                 body->oa.o_gr : (__u64)0,
1169                                    local_nb[0].offset,
1170                                    local_nb[npages-1].offset +
1171                                    local_nb[npages-1].len - 1 );
1172                 CERROR("client csum %x, original server csum %x, "
1173                        "server csum now %x\n",
1174                        client_cksum, server_cksum, new_cksum);
1175         }
1176
1177         if (rc == 0) {
1178                 int nob = 0;
1179
1180                 /* set per-requested niobuf return codes */
1181                 for (i = j = 0; i < niocount; i++) {
1182                         int len = remote_nb[i].len;
1183
1184                         nob += len;
1185                         rcs[i] = 0;
1186                         do {
1187                                 LASSERT(j < npages);
1188                                 if (local_nb[j].rc < 0)
1189                                         rcs[i] = local_nb[j].rc;
1190                                 len -= local_nb[j].len;
1191                                 j++;
1192                         } while (len > 0);
1193                         LASSERT(len == 0);
1194                 }
1195                 LASSERT(j == npages);
1196                 ptlrpc_lprocfs_brw(req, nob);
1197         }
1198
1199  out_lock:
1200         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1201  out_tls:
1202         ost_tls_put(req);
1203  out_bulk:
1204         if (desc)
1205                 ptlrpc_free_bulk(desc);
1206  out:
1207         if (rc == 0) {
1208                 oti_to_request(oti, req);
1209                 target_committed_to_req(req);
1210                 rc = ptlrpc_reply(req);
1211         } else if (!no_reply) {
1212                 /* Only reply if there was no comms problem with bulk */
1213                 target_committed_to_req(req);
1214                 req->rq_status = rc;
1215                 ptlrpc_error(req);
1216         } else {
1217                 /* reply out callback would free */
1218                 ptlrpc_req_drop_rs(req);
1219                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1220                       "client will retry\n",
1221                       exp->exp_obd->obd_name,
1222                       exp->exp_client_uuid.uuid,
1223                       exp->exp_connection->c_remote_uuid.uuid,
1224                       libcfs_id2str(req->rq_peer));
1225         }
1226         libcfs_memory_pressure_clr();
1227         RETURN(rc);
1228 }
1229
1230 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1231 {
1232         struct ost_body *body = NULL, *repbody;
1233         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1234         char *key, *val = NULL;
1235         int keylen, vallen, rc = 0;
1236         ENTRY;
1237
1238         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1239         if (key == NULL) {
1240                 DEBUG_REQ(D_HA, req, "no set_info key");
1241                 RETURN(-EFAULT);
1242         }
1243         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1244
1245         if (KEY_IS(KEY_GRANT_SHRINK)) {
1246                 rc = lustre_pack_reply(req, 2, size, NULL);
1247                 if (rc)
1248                         RETURN(rc);
1249         } else {
1250                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1251                 if (rc)
1252                         RETURN(rc);
1253         }
1254
1255         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1256         if (vallen) {
1257                 if (KEY_IS(KEY_GRANT_SHRINK)) {
1258                         body = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
1259                                                   sizeof(*body),
1260                                                   lustre_swab_ost_body);
1261                         if (!body)
1262                                 RETURN(-EFAULT);
1263
1264                         repbody = lustre_msg_buf(req->rq_repmsg,
1265                                                  REPLY_REC_OFF,
1266                                                  sizeof(*repbody));
1267                         memcpy(repbody, body, sizeof(*body));
1268                         val = (char*)repbody;
1269                 } else
1270                         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,0);
1271         }
1272
1273         if (KEY_IS(KEY_EVICT_BY_NID)) {
1274                 if (val && vallen)
1275                         obd_export_evict_by_nid(exp->exp_obd, val);
1276
1277                 GOTO(out, rc = 0);
1278         }
1279
1280         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1281 out:
1282         lustre_msg_set_status(req->rq_repmsg, 0);
1283         RETURN(rc);
1284 }
1285
1286 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1287 {
1288         void *key, *reply;
1289         int keylen, rc = 0;
1290         int size[2] = { sizeof(struct ptlrpc_body), 0 };
1291         ENTRY;
1292
1293         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1294         if (key == NULL) {
1295                 DEBUG_REQ(D_HA, req, "no get_info key");
1296                 RETURN(-EFAULT);
1297         }
1298         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1299
1300         /* call once to get the size to allocate the reply buffer */
1301         rc = obd_get_info(exp, keylen, key, &size[1], NULL, NULL);
1302         if (rc)
1303                 RETURN(rc);
1304
1305         rc = lustre_pack_reply(req, 2, size, NULL);
1306         if (rc)
1307                 RETURN(rc);
1308
1309         reply = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*reply));
1310         /* call again to fill in the reply buffer */
1311         rc = obd_get_info(exp, keylen, key, size, reply, NULL);
1312         lustre_msg_set_status(req->rq_repmsg, 0);
1313
1314         RETURN(rc);
1315 }
1316
1317 #ifdef HAVE_QUOTA_SUPPORT
1318 static int ost_handle_quotactl(struct ptlrpc_request *req)
1319 {
1320         struct obd_quotactl *oqctl, *repoqc;
1321         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1322         int rc;
1323         ENTRY;
1324
1325         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1326                                    lustre_swab_obd_quotactl);
1327         if (oqctl == NULL)
1328                 GOTO(out, rc = -EPROTO);
1329
1330         rc = lustre_pack_reply(req, 2, size, NULL);
1331         if (rc)
1332                 GOTO(out, rc);
1333
1334         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1335
1336         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1337         *repoqc = *oqctl;
1338 out:
1339         RETURN(rc);
1340 }
1341
1342 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1343 {
1344         struct obd_quotactl *oqctl;
1345         int rc;
1346         ENTRY;
1347
1348         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1349                                    lustre_swab_obd_quotactl);
1350         if (oqctl == NULL)
1351                 RETURN(-EPROTO);
1352
1353         rc = lustre_pack_reply(req, 1, NULL, NULL);
1354         if (rc)
1355                 RETURN(rc);
1356
1357         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1358         RETURN(0);
1359 }
1360
1361 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1362 {
1363         struct quota_adjust_qunit *oqaq, *repoqa;
1364         struct lustre_quota_ctxt *qctxt;
1365         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqa) };
1366         int rc;
1367         ENTRY;
1368
1369         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1370         oqaq = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqaq),
1371                                   lustre_swab_quota_adjust_qunit);
1372
1373         if (oqaq == NULL)
1374                 GOTO(out, rc = -EPROTO);
1375         rc = lustre_pack_reply(req, 2, size, NULL);
1376         if (rc)
1377                 GOTO(out, rc);
1378         repoqa = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqa));
1379         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt);
1380         *repoqa = *oqaq;
1381  out:
1382         RETURN(rc);
1383 }
1384 #endif
1385
1386 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1387  * (if requested) */
1388 int ost_blocking_ast(struct ldlm_lock *lock,
1389                              struct ldlm_lock_desc *desc,
1390                              void *data, int flag)
1391 {
1392         __u32 sync_lock_cancel = 0;
1393         __u32 len = sizeof(sync_lock_cancel);
1394         int rc = 0;
1395         ENTRY;
1396
1397         rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1398                           KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1399
1400         if (!rc && flag == LDLM_CB_CANCELING &&
1401             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1402             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1403              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1404               lock->l_flags & LDLM_FL_CBPENDING))) {
1405                 struct obd_info *oinfo;
1406
1407                 OBD_ALLOC_PTR(oinfo);
1408                 if (!oinfo)
1409                         RETURN(-ENOMEM);
1410
1411                 /* force journal commit through fs sync */
1412                 rc = obd_sync(lock->l_export, oinfo, 0, 0, NULL);
1413                 if (rc)
1414                         CERROR("Error %d syncing data on lock cancel\n", rc);
1415
1416                 OBD_FREE_PTR(oinfo);
1417         }
1418
1419         return ldlm_server_blocking_ast(lock, desc, data, flag);
1420 }
1421
1422 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1423                                        struct obd_device *obd, int *process)
1424 {
1425         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1426         case OST_CONNECT: /* This will never get here, but for completeness. */
1427         case OST_DISCONNECT:
1428                *process = 1;
1429                RETURN(0);
1430
1431         case OBD_PING:
1432         case OST_CREATE:
1433         case OST_DESTROY:
1434         case OST_PUNCH:
1435         case OST_SETATTR:
1436         case OST_SYNC:
1437         case OST_WRITE:
1438         case OBD_LOG_CANCEL:
1439         case LDLM_ENQUEUE:
1440                 *process = target_queue_recovery_request(req, obd);
1441                 RETURN(0);
1442
1443         default:
1444                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1445                 *process = 0;
1446                 /* XXX what should we set rq_status to here? */
1447                 req->rq_status = -EAGAIN;
1448                 RETURN(ptlrpc_error(req));
1449         }
1450 }
1451
1452 int ost_msg_check_version(struct lustre_msg *msg)
1453 {
1454         int rc;
1455
1456         switch(lustre_msg_get_opc(msg)) {
1457         case OST_CONNECT:
1458         case OST_DISCONNECT:
1459         case OBD_PING:
1460                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1461                 if (rc)
1462                         CERROR("bad opc %u version %08x, expecting %08x\n",
1463                                lustre_msg_get_opc(msg),
1464                                lustre_msg_get_version(msg),
1465                                LUSTRE_OBD_VERSION);
1466                 break;
1467         case OST_CREATE:
1468         case OST_DESTROY:
1469         case OST_GETATTR:
1470         case OST_SETATTR:
1471         case OST_WRITE:
1472         case OST_READ:
1473         case OST_PUNCH:
1474         case OST_STATFS:
1475         case OST_SYNC:
1476         case OST_SET_INFO:
1477         case OST_GET_INFO:
1478 #ifdef HAVE_QUOTA_SUPPORT
1479         case OST_QUOTACHECK:
1480         case OST_QUOTACTL:
1481         case OST_QUOTA_ADJUST_QUNIT:
1482 #endif
1483                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1484                 if (rc)
1485                         CERROR("bad opc %u version %08x, expecting %08x\n",
1486                                lustre_msg_get_opc(msg),
1487                                lustre_msg_get_version(msg),
1488                                LUSTRE_OST_VERSION);
1489                 break;
1490         case LDLM_ENQUEUE:
1491         case LDLM_CONVERT:
1492         case LDLM_CANCEL:
1493         case LDLM_BL_CALLBACK:
1494         case LDLM_CP_CALLBACK:
1495                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1496                 if (rc)
1497                         CERROR("bad opc %u version %08x, expecting %08x\n",
1498                                lustre_msg_get_opc(msg),
1499                                lustre_msg_get_version(msg),
1500                                LUSTRE_DLM_VERSION);
1501                 break;
1502         case LLOG_ORIGIN_CONNECT:
1503         case OBD_LOG_CANCEL:
1504                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1505                 if (rc)
1506                         CERROR("bad opc %u version %08x, expecting %08x\n",
1507                                lustre_msg_get_opc(msg),
1508                                lustre_msg_get_version(msg),
1509                                LUSTRE_LOG_VERSION);
1510                 break;
1511         default:
1512                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1513                 rc = -ENOTSUPP;
1514         }
1515         return rc;
1516 }
1517
1518 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1519                                    struct ldlm_lock *lock)
1520 {
1521         struct niobuf_remote *nb;
1522         struct obd_ioobj *ioo;
1523         struct ost_body *body;
1524         int objcount, niocount;
1525         int mode, opc, i;
1526         __u64 start, end;
1527         ENTRY;
1528
1529         opc = lustre_msg_get_opc(req->rq_reqmsg);
1530         LASSERT(opc == OST_READ || opc == OST_WRITE);
1531
1532         /* As the request may be covered by several locks, do not look at
1533          * o_handle, look at the RPC IO region. */
1534         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
1535                                   lustre_swab_obdo);
1536         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
1537                    sizeof(*ioo);
1538         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
1539                              objcount * sizeof(*ioo));
1540         LASSERT(ioo != NULL);
1541         for (niocount = i = 0; i < objcount; i++)
1542                 niocount += ioo[i].ioo_bufcnt;
1543
1544         nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1545                             niocount * sizeof(*nb));
1546         LASSERT(nb != NULL);
1547
1548         mode = LCK_PW;
1549         if (opc == OST_READ)
1550                 mode |= LCK_PR;
1551
1552         start = nb[0].offset & CFS_PAGE_MASK;
1553         end = (nb[ioo->ioo_bufcnt - 1].offset +
1554                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
1555
1556         LASSERT(lock->l_resource != NULL);
1557         if (lock->l_resource->lr_name.name[0] != ioo->ioo_id)
1558                 RETURN(0);
1559
1560         if (!(lock->l_granted_mode & mode))
1561                 RETURN(0);
1562
1563         if (lock->l_policy_data.l_extent.end < start ||
1564             lock->l_policy_data.l_extent.start > end)
1565                 RETURN(0);
1566
1567         RETURN(1);
1568 }
1569
1570 /**
1571  * Swab buffers needed to call ost_rw_prolong_locks() and call it.
1572  * Return the value from ost_rw_prolong_locks() which is non-zero if
1573  * there is a cancelled lock which is waiting for this IO request.
1574  */
1575 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1576 {
1577         struct niobuf_remote *nb;
1578         struct obd_ioobj *ioo;
1579         struct ost_body *body;
1580         int objcount, niocount;
1581         int mode, opc, i;
1582         ENTRY;
1583
1584         opc = lustre_msg_get_opc(req->rq_reqmsg);
1585         LASSERT(opc == OST_READ || opc == OST_WRITE);
1586
1587         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1588         LASSERT(body != NULL);
1589
1590         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
1591                    sizeof(*ioo);
1592         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
1593                              objcount * sizeof(*ioo));
1594         LASSERT(ioo != NULL);
1595
1596         for (niocount = i = 0; i < objcount; i++)
1597                 niocount += ioo[i].ioo_bufcnt;
1598         nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1599                             niocount * sizeof(*nb));
1600         LASSERT(nb != NULL);
1601         LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
1602
1603         mode = LCK_PW;
1604         if (opc == OST_READ)
1605                 mode |= LCK_PR;
1606         RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
1607 }
1608
1609 static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
1610 {
1611         struct ldlm_res_id res_id = { .name = { oa->o_id } };
1612         struct ost_prolong_data opd = { 0 };
1613         __u64 start, end;
1614         ENTRY;
1615
1616         start = oa->o_size;
1617         end = start + oa->o_blocks;
1618
1619         opd.opd_mode = LCK_PW;
1620         opd.opd_exp = req->rq_export;
1621         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
1622         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
1623                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
1624         else
1625                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
1626
1627         /* prolong locks for the current service time of the corresponding
1628          * portal (= OST_IO_PORTAL) */
1629         opd.opd_timeout = AT_OFF ? obd_timeout / 2 :
1630                           max(at_est2timeout(at_get(&req->rq_rqbd->
1631                               rqbd_service->srv_at_estimate)), ldlm_timeout);
1632
1633         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1634                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
1635                opd.opd_policy.l_extent.end);
1636
1637         opd.opd_oa = oa;
1638
1639         ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
1640                               ost_prolong_locks_iter, &opd);
1641         RETURN(opd.opd_lock_match);
1642 }
1643
1644 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1645                                       struct ldlm_lock *lock)
1646 {
1647         struct ost_body *body;
1648         ENTRY;
1649
1650         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
1651                                   lustre_swab_obdo);
1652         LASSERT(body != NULL);
1653
1654         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1655             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1656                 RETURN(1);
1657         RETURN(0);
1658 }
1659
1660 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1661 {
1662         struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
1663                                                REQ_REC_OFF, sizeof(*body));
1664         LASSERT(body != NULL);
1665         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1666                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
1667
1668         RETURN(ost_punch_prolong_locks(req, &body->oa));
1669 }
1670
1671 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1672         .hpreq_lock_match  = ost_rw_hpreq_lock_match,
1673         .hpreq_check       = ost_rw_hpreq_check,
1674 };
1675
1676 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1677         .hpreq_lock_match  = ost_punch_hpreq_lock_match,
1678         .hpreq_check       = ost_punch_hpreq_check,
1679 };
1680
1681 /** Assign high priority operations to the request if needed. */
1682 static int ost_hpreq_handler(struct ptlrpc_request *req)
1683 {
1684         ENTRY;
1685         if (req->rq_export) {
1686                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1687                 struct ost_body *body;
1688
1689                 if (opc == OST_READ || opc == OST_WRITE) {
1690                         struct niobuf_remote *nb;
1691                         struct obd_ioobj *ioo;
1692                         int objcount, niocount;
1693                         int swab, i;
1694
1695                         body = lustre_swab_reqbuf(req, REQ_REC_OFF,
1696                                                   sizeof(*body),
1697                                                   lustre_swab_obdo);
1698                         if (!body) {
1699                                 CERROR("Missing/short ost_body\n");
1700                                 RETURN(-EFAULT);
1701                         }
1702                         objcount = lustre_msg_buflen(req->rq_reqmsg,
1703                                                      REQ_REC_OFF + 1) /
1704                                 sizeof(*ioo);
1705                         if (objcount == 0) {
1706                                 CERROR("Missing/short ioobj\n");
1707                                 RETURN(-EFAULT);
1708                         }
1709                         if (objcount > 1) {
1710                                 CERROR("too many ioobjs (%d)\n", objcount);
1711                                 RETURN(-EFAULT);
1712                         }
1713
1714                         swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
1715                                 lustre_req_need_swab(req);
1716                         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
1717                                                  objcount * sizeof(*ioo),
1718                                                  lustre_swab_obd_ioobj);
1719                         if (!ioo) {
1720                                 CERROR("Missing/short ioobj\n");
1721                                 RETURN(-EFAULT);
1722                         }
1723                         for (niocount = i = 0; i < objcount; i++) {
1724                                 if (i > 0 && swab)
1725                                         lustre_swab_obd_ioobj(&ioo[i]);
1726                                 if (ioo[i].ioo_bufcnt == 0) {
1727                                         CERROR("ioo[%d] has zero bufcnt\n", i);
1728                                         RETURN(-EFAULT);
1729                                 }
1730                                 niocount += ioo[i].ioo_bufcnt;
1731                         }
1732                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
1733                                 DEBUG_REQ(D_ERROR, req, "bulk has too many "
1734                                           "pages (%d)", niocount);
1735                                 RETURN(-EFAULT);
1736                         }
1737
1738                         swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
1739                                 lustre_req_need_swab(req);
1740                         nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
1741                                                 niocount * sizeof(*nb),
1742                                                 lustre_swab_niobuf_remote);
1743                         if (!nb) {
1744                                 CERROR("Missing/short niobuf\n");
1745                                 RETURN(-EFAULT);
1746                         }
1747
1748                         if (swab) {
1749                                 /* swab remaining niobufs */
1750                                 for (i = 1; i < niocount; i++)
1751                                         lustre_swab_niobuf_remote(&nb[i]);
1752                         }
1753
1754                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
1755                                 req->rq_ops = &ost_hpreq_rw;
1756                 } else if (opc == OST_PUNCH) {
1757                         body = lustre_swab_reqbuf(req, REQ_REC_OFF,
1758                                                   sizeof(*body),
1759                                                   lustre_swab_obdo);
1760                         if (!body) {
1761                                 CERROR("Missing/short ost_body\n");
1762                                 RETURN(-EFAULT);
1763                         }
1764
1765                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
1766                             !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
1767                                 req->rq_ops = &ost_hpreq_punch;
1768                 }
1769         }
1770         RETURN(0);
1771 }
1772
1773 static int ost_handle(struct ptlrpc_request *req)
1774 {
1775         struct obd_trans_info trans_info = { 0, };
1776         struct obd_trans_info *oti = &trans_info;
1777         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1778         struct obd_device *obd = NULL;
1779         struct llogd_conn_body *body;
1780         ENTRY;
1781
1782         LASSERT(current->journal_info == NULL);
1783         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1784                 int recovering;
1785
1786                 if (!class_connected_export(req->rq_export)) {
1787                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1788                                lustre_msg_get_opc(req->rq_reqmsg),
1789                                libcfs_id2str(req->rq_peer));
1790                         req->rq_status = -ENOTCONN;
1791                         GOTO(out, rc = -ENOTCONN);
1792                 }
1793
1794                 obd = req->rq_export->exp_obd;
1795
1796                 /* Check for aborted recovery. */
1797                 spin_lock_bh(&obd->obd_processing_task_lock);
1798                 recovering = obd->obd_recovering;
1799                 spin_unlock_bh(&obd->obd_processing_task_lock);
1800                 if (recovering &&
1801                     target_recovery_check_and_stop(obd) == 0) {
1802                         rc = ost_filter_recovery_request(req, obd,
1803                                                          &should_process);
1804                         if (rc || !should_process)
1805                                 RETURN(rc);
1806                 }
1807         }
1808
1809         oti_init(oti, req);
1810         rc = ost_msg_check_version(req->rq_reqmsg);
1811         if (rc)
1812                 RETURN(rc);
1813
1814         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1815         case OST_CONNECT: {
1816                 CDEBUG(D_INODE, "connect\n");
1817                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
1818                 rc = target_handle_connect(req, ost_handle);
1819                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET2, 0);
1820                 if (!rc)
1821                         obd = req->rq_export->exp_obd;
1822                 break;
1823         }
1824         case OST_DISCONNECT:
1825                 CDEBUG(D_INODE, "disconnect\n");
1826                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
1827                 rc = target_handle_disconnect(req);
1828                 break;
1829         case OST_CREATE:
1830                 CDEBUG(D_INODE, "create\n");
1831                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
1832                 OBD_FAIL_TIMEOUT_MS(OBD_FAIL_OST_PAUSE_CREATE, obd_fail_val);
1833                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1834                         GOTO(out, rc = -EROFS);
1835                 rc = ost_create(req->rq_export, req, oti);
1836                 break;
1837         case OST_DESTROY:
1838                 CDEBUG(D_INODE, "destroy\n");
1839                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
1840                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1841                         GOTO(out, rc = -EROFS);
1842                 rc = ost_destroy(req->rq_export, req, oti);
1843                 break;
1844         case OST_GETATTR:
1845                 CDEBUG(D_INODE, "getattr\n");
1846                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
1847                 rc = ost_getattr(req->rq_export, req);
1848                 break;
1849         case OST_SETATTR:
1850                 CDEBUG(D_INODE, "setattr\n");
1851                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
1852                 rc = ost_setattr(req->rq_export, req, oti);
1853                 break;
1854         case OST_WRITE:
1855                 CDEBUG(D_INODE, "write\n");
1856                 /* req->rq_request_portal would be nice, if it was set */
1857                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1858                         CERROR("%s: deny write request from %s to portal %u\n",
1859                                req->rq_export->exp_obd->obd_name,
1860                                obd_export_nid2str(req->rq_export),
1861                                req->rq_rqbd->rqbd_service->srv_req_portal);
1862                         GOTO(out, rc = -EPROTO);
1863                 }
1864                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1865                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
1866                         GOTO(out, rc = -ENOSPC);
1867                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1868                         GOTO(out, rc = -EROFS);
1869                 rc = ost_brw_write(req, oti);
1870                 LASSERT(current->journal_info == NULL);
1871                 /* ost_brw_write sends its own replies */
1872                 RETURN(rc);
1873         case OST_READ:
1874                 CDEBUG(D_INODE, "read\n");
1875                 /* req->rq_request_portal would be nice, if it was set */
1876                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1877                         CERROR("%s: deny read request from %s to portal %u\n",
1878                                req->rq_export->exp_obd->obd_name,
1879                                obd_export_nid2str(req->rq_export),
1880                                req->rq_rqbd->rqbd_service->srv_req_portal);
1881                         GOTO(out, rc = -EPROTO);
1882                 }
1883                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1884                 rc = ost_brw_read(req, oti);
1885                 LASSERT(current->journal_info == NULL);
1886                 /* ost_brw_read sends its own replies */
1887                 RETURN(rc);
1888         case OST_PUNCH:
1889                 CDEBUG(D_INODE, "punch\n");
1890                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1891                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1892                         GOTO(out, rc = -EROFS);
1893                 rc = ost_punch(req->rq_export, req, oti);
1894                 break;
1895         case OST_STATFS:
1896                 CDEBUG(D_INODE, "statfs\n");
1897                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1898                 rc = ost_statfs(req);
1899                 break;
1900         case OST_SYNC:
1901                 CDEBUG(D_INODE, "sync\n");
1902                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1903                 rc = ost_sync(req->rq_export, req);
1904                 break;
1905         case OST_SET_INFO:
1906                 DEBUG_REQ(D_INODE, req, "set_info");
1907                 rc = ost_set_info(req->rq_export, req);
1908                 break;
1909         case OST_GET_INFO:
1910                 DEBUG_REQ(D_INODE, req, "get_info");
1911                 rc = ost_get_info(req->rq_export, req);
1912                 break;
1913 #ifdef HAVE_QUOTA_SUPPORT
1914         case OST_QUOTACHECK:
1915                 CDEBUG(D_INODE, "quotacheck\n");
1916                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACHECK_NET, 0);
1917                 rc = ost_handle_quotacheck(req);
1918                 break;
1919         case OST_QUOTACTL:
1920                 CDEBUG(D_INODE, "quotactl\n");
1921                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACTL_NET, 0);
1922                 rc = ost_handle_quotactl(req);
1923                 break;
1924         case OST_QUOTA_ADJUST_QUNIT:
1925                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
1926                 rc = ost_handle_quota_adjust_qunit(req);
1927                 break;
1928 #endif
1929         case OBD_PING:
1930                 DEBUG_REQ(D_INODE, req, "ping");
1931                 rc = target_handle_ping(req);
1932                 break;
1933         /* FIXME - just reply status */
1934         case LLOG_ORIGIN_CONNECT:
1935                 DEBUG_REQ(D_INODE, req, "log connect");
1936                 body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
1937                                       sizeof(*body));
1938                 rc = obd_llog_connect(req->rq_export, body);
1939                 req->rq_status = rc;
1940                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1941                 if (rc)
1942                         RETURN(rc);
1943                 RETURN(ptlrpc_reply(req));
1944         case OBD_LOG_CANCEL:
1945                 CDEBUG(D_INODE, "log cancel\n");
1946                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1947                 rc = llog_origin_handle_cancel(req);
1948                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1949                 req->rq_status = rc;
1950                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1951                 if (rc)
1952                         RETURN(rc);
1953                 RETURN(ptlrpc_reply(req));
1954         case LDLM_ENQUEUE:
1955                 CDEBUG(D_INODE, "enqueue\n");
1956                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1957                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1958                                          ost_blocking_ast,
1959                                          ldlm_server_glimpse_ast);
1960                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1961                 break;
1962         case LDLM_CONVERT:
1963                 CDEBUG(D_INODE, "convert\n");
1964                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1965                 rc = ldlm_handle_convert(req);
1966                 break;
1967         case LDLM_CANCEL:
1968                 CDEBUG(D_INODE, "cancel\n");
1969                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1970                 rc = ldlm_handle_cancel(req);
1971                 break;
1972         case LDLM_BL_CALLBACK:
1973         case LDLM_CP_CALLBACK:
1974                 CDEBUG(D_INODE, "callback\n");
1975                 CERROR("callbacks should not happen on OST\n");
1976                 /* fall through */
1977         default:
1978                 CERROR("Unexpected opcode %d\n",
1979                        lustre_msg_get_opc(req->rq_reqmsg));
1980                 req->rq_status = -ENOTSUPP;
1981                 rc = ptlrpc_error(req);
1982                 RETURN(rc);
1983         }
1984
1985         LASSERT(current->journal_info == NULL);
1986
1987         EXIT;
1988         /* If we're DISCONNECTing, the export_data is already freed */
1989         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
1990                 target_committed_to_req(req);
1991
1992 out:
1993         if (!rc)
1994                 oti_to_request(oti, req);
1995         return target_handle_reply(req, rc, fail);
1996 }
1997
1998 /*
1999  * free per-thread pool created by ost_thread_init().
2000  */
2001 static void ost_thread_done(struct ptlrpc_thread *thread)
2002 {
2003         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2004                                              * Storage */
2005
2006         ENTRY;
2007
2008         LASSERT(thread != NULL);
2009
2010         /*
2011          * be prepared to handle partially-initialized pools (because this is
2012          * called from ost_thread_init() for cleanup.
2013          */
2014         tls = thread->t_data;
2015         if (tls != NULL) {
2016                 OBD_FREE_PTR(tls);
2017                 thread->t_data = NULL;
2018         }
2019         EXIT;
2020 }
2021
2022 /*
2023  * initialize per-thread page pool (bug 5137).
2024  */
2025 static int ost_thread_init(struct ptlrpc_thread *thread)
2026 {
2027         struct ost_thread_local_cache *tls;
2028
2029         ENTRY;
2030
2031         LASSERT(thread != NULL);
2032         LASSERT(thread->t_data == NULL);
2033         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2034
2035         OBD_ALLOC_PTR(tls);
2036         if (tls == NULL)
2037                 RETURN(-ENOMEM);
2038         thread->t_data = tls;
2039         RETURN(0);
2040 }
2041
2042 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2043 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
2044 {
2045         struct ost_obd *ost = &obd->u.ost;
2046         struct lprocfs_static_vars lvars;
2047         int oss_min_threads;
2048         int oss_max_threads;
2049         int oss_min_create_threads;
2050         int oss_max_create_threads;
2051         int rc;
2052         ENTRY;
2053
2054         rc = cleanup_group_info();
2055         if (rc)
2056                 RETURN(rc);
2057         lprocfs_ost_init_vars(&lvars);
2058         lprocfs_obd_setup(obd, lvars.obd_vars);
2059
2060         sema_init(&ost->ost_health_sem, 1);
2061
2062         if (oss_num_threads) {
2063                 /* If oss_num_threads is set, it is the min and the max. */
2064                 if (oss_num_threads > OSS_THREADS_MAX)
2065                         oss_num_threads = OSS_THREADS_MAX;
2066                 if (oss_num_threads < OSS_THREADS_MIN)
2067                         oss_num_threads = OSS_THREADS_MIN;
2068                 oss_max_threads = oss_min_threads = oss_num_threads;
2069         } else {
2070                 /* Base min threads on memory and cpus */
2071                 oss_min_threads = num_possible_cpus() * CFS_NUM_CACHEPAGES >>
2072                         (27 - CFS_PAGE_SHIFT);
2073                 if (oss_min_threads < OSS_THREADS_MIN)
2074                         oss_min_threads = OSS_THREADS_MIN;
2075                 /* Insure a 4x range for dynamic threads */
2076                 if (oss_min_threads > OSS_THREADS_MAX / 4)
2077                         oss_min_threads = OSS_THREADS_MAX / 4;
2078                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2079         }
2080
2081         ost->ost_service =
2082                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2083                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
2084                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2085                                 ost_handle, LUSTRE_OSS_NAME,
2086                                 obd->obd_proc_entry, target_print_req,
2087                                 oss_min_threads, oss_max_threads, "ll_ost",
2088                                 NULL);
2089         if (ost->ost_service == NULL) {
2090                 CERROR("failed to start OST service\n");
2091                 GOTO(out_lprocfs, rc = -ENOMEM);
2092         }
2093
2094         rc = ptlrpc_start_threads(obd, ost->ost_service);
2095         if (rc)
2096                 GOTO(out_service, rc = -EINVAL);
2097
2098         if (oss_num_create_threads) {
2099                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2100                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2101                 if (oss_num_create_threads < OSS_DEF_CREATE_THREADS)
2102                         oss_num_create_threads = OSS_DEF_CREATE_THREADS;
2103                 oss_min_create_threads = oss_max_create_threads =
2104                         oss_num_create_threads;
2105         } else {
2106                 oss_min_create_threads = OSS_DEF_CREATE_THREADS;
2107                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2108         }
2109
2110         ost->ost_create_service =
2111                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2112                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
2113                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2114                                 ost_handle, "ost_create",
2115                                 obd->obd_proc_entry, target_print_req,
2116                                 oss_min_create_threads,
2117                                 oss_max_create_threads,
2118                                 "ll_ost_creat", NULL);
2119         if (ost->ost_create_service == NULL) {
2120                 CERROR("failed to start OST create service\n");
2121                 GOTO(out_service, rc = -ENOMEM);
2122         }
2123
2124         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
2125         if (rc)
2126                 GOTO(out_create, rc = -EINVAL);
2127
2128         ost->ost_io_service =
2129                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
2130                                 OST_MAXREPSIZE, OST_IO_PORTAL,
2131                                 OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
2132                                 ost_handle, "ost_io",
2133                                 obd->obd_proc_entry, target_print_req,
2134                                 oss_min_threads, oss_max_threads, "ll_ost_io",
2135                                 ost_hpreq_handler);
2136         if (ost->ost_io_service == NULL) {
2137                 CERROR("failed to start OST I/O service\n");
2138                 GOTO(out_create, rc = -ENOMEM);
2139         }
2140
2141         ost->ost_io_service->srv_init = ost_thread_init;
2142         ost->ost_io_service->srv_done = ost_thread_done;
2143         ost->ost_io_service->srv_cpu_affinity = 1;
2144         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
2145         if (rc)
2146                 GOTO(out_io, rc = -EINVAL);
2147
2148         ping_evictor_start();
2149
2150         RETURN(0);
2151
2152 out_io:
2153         ptlrpc_unregister_service(ost->ost_io_service);
2154         ost->ost_io_service = NULL;
2155 out_create:
2156         ptlrpc_unregister_service(ost->ost_create_service);
2157         ost->ost_create_service = NULL;
2158 out_service:
2159         ptlrpc_unregister_service(ost->ost_service);
2160         ost->ost_service = NULL;
2161 out_lprocfs:
2162         lprocfs_obd_cleanup(obd);
2163         RETURN(rc);
2164 }
2165
2166 static int ost_cleanup(struct obd_device *obd)
2167 {
2168         struct ost_obd *ost = &obd->u.ost;
2169         int err = 0;
2170         ENTRY;
2171
2172         ping_evictor_stop();
2173
2174         spin_lock_bh(&obd->obd_processing_task_lock);
2175         if (obd->obd_recovering) {
2176                 target_cancel_recovery_timer(obd);
2177                 obd->obd_recovering = 0;
2178         }
2179         spin_unlock_bh(&obd->obd_processing_task_lock);
2180
2181         down(&ost->ost_health_sem);
2182         ptlrpc_unregister_service(ost->ost_service);
2183         ptlrpc_unregister_service(ost->ost_create_service);
2184         ptlrpc_unregister_service(ost->ost_io_service);
2185         ost->ost_service = NULL;
2186         ost->ost_create_service = NULL;
2187         up(&ost->ost_health_sem);
2188
2189         lprocfs_obd_cleanup(obd);
2190
2191         RETURN(err);
2192 }
2193
2194 static int ost_health_check(struct obd_device *obd)
2195 {
2196         struct ost_obd *ost = &obd->u.ost;
2197         int rc = 0;
2198
2199         down(&ost->ost_health_sem);
2200         rc |= ptlrpc_service_health_check(ost->ost_service);
2201         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2202         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2203         up(&ost->ost_health_sem);
2204
2205         /*
2206          * health_check to return 0 on healthy
2207          * and 1 on unhealthy.
2208          */
2209         if( rc != 0)
2210                 rc = 1;
2211
2212         return rc;
2213 }
2214
2215 /* use obd ops to offer management infrastructure */
2216 static struct obd_ops ost_obd_ops = {
2217         .o_owner        = THIS_MODULE,
2218         .o_setup        = ost_setup,
2219         .o_cleanup      = ost_cleanup,
2220         .o_health_check = ost_health_check,
2221 };
2222
2223
2224 static int __init ost_init(void)
2225 {
2226         struct lprocfs_static_vars lvars;
2227         int rc;
2228         ENTRY;
2229
2230         lprocfs_ost_init_vars(&lvars);
2231         rc = class_register_type(&ost_obd_ops, lvars.module_vars,
2232                                  LUSTRE_OSS_NAME);
2233
2234         if (ost_num_threads != 0 && oss_num_threads == 0) {
2235                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2236                               "use oss_num_threads instead or unset both for "
2237                               "dynamic thread startup\n");
2238                 oss_num_threads = ost_num_threads;
2239         }
2240
2241         RETURN(rc);
2242 }
2243
2244 static void /*__exit*/ ost_exit(void)
2245 {
2246         class_unregister_type(LUSTRE_OSS_NAME);
2247 }
2248
2249 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2250 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2251 MODULE_LICENSE("GPL");
2252
2253 module_init(ost_init);
2254 module_exit(ost_exit);