Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  *
26  *  Storage Target Handling functions
27  *  Lustre Object Server Module (OST)
28  *
29  *  This server is single threaded at present (but can easily be multi
30  *  threaded). For testing and management it is treated as an
31  *  obd_device, although it does not export a full OBD method table
32  *  (the requests are coming in over the wire, so object target
33  *  modules do not have a full method table.)
34  */
35
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_OST
40
41 #include <linux/module.h>
42 #include <obd_ost.h>
43 #include <lustre_net.h>
44 #include <lustre_dlm.h>
45 #include <lustre_export.h>
46 #include <lustre_debug.h>
47 #include <linux/init.h>
48 #include <lprocfs_status.h>
49 #include <lustre_commit_confd.h>
50 #include <libcfs/list.h>
51 #include <lustre_quota.h>
52 #include "ost_internal.h"
53
54 static int oss_num_threads;
55 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
56                 "number of OSS service threads to start");
57
58 static int ost_num_threads;
59 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
60                 "number of OST service threads to start (deprecated)");
61
62 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
63 {
64         struct oti_req_ack_lock *ack_lock;
65         int i;
66
67         if (oti == NULL)
68                 return;
69
70         if (req->rq_repmsg)
71                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
72         req->rq_transno = oti->oti_transno;
73
74         /* XXX 4 == entries in oti_ack_locks??? */
75         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
76                 if (!ack_lock->mode)
77                         break;
78                 /* XXX not even calling target_send_reply in some cases... */
79                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
80         }
81 }
82
83 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
84                        struct obd_trans_info *oti)
85 {
86         struct ost_body *body, *repbody;
87         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
88         ENTRY;
89
90         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
91                                   lustre_swab_ost_body);
92         if (body == NULL)
93                 RETURN(-EFAULT);
94
95         rc = lustre_pack_reply(req, 2, size, NULL);
96         if (rc)
97                 RETURN(rc);
98
99         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
100                 oti->oti_logcookies = obdo_logcookie(&body->oa);
101         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
102                                  sizeof(*repbody));
103         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
104         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
105         RETURN(0);
106 }
107
108 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
109 {
110         struct ost_body *body, *repbody;
111         struct obd_info oinfo = { { { 0 } } };
112         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
113         ENTRY;
114
115         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
116                                   lustre_swab_ost_body);
117         if (body == NULL)
118                 RETURN(-EFAULT);
119
120         rc = lustre_pack_reply(req, 2, size, NULL);
121         if (rc)
122                 RETURN(rc);
123
124         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
125                                  sizeof(*repbody));
126         repbody->oa = body->oa;
127
128         oinfo.oi_oa = &repbody->oa;
129         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
130                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
131                                                    REQ_REC_OFF + 1);
132         req->rq_status = obd_getattr(exp, &oinfo);
133         RETURN(0);
134 }
135
136 static int ost_statfs(struct ptlrpc_request *req)
137 {
138         struct obd_statfs *osfs;
139         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
140         ENTRY;
141
142         rc = lustre_pack_reply(req, 2, size, NULL);
143         if (rc)
144                 RETURN(rc);
145
146         osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
147
148         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
149                                     cfs_time_current_64() - HZ);
150         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
151                 osfs->os_bfree = osfs->os_bavail = 64;
152         if (req->rq_status != 0)
153                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
154
155         RETURN(0);
156 }
157
158 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
159                       struct obd_trans_info *oti)
160 {
161         struct ost_body *body, *repbody;
162         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
163         ENTRY;
164
165         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
166                                   lustre_swab_ost_body);
167         if (body == NULL)
168                 RETURN(-EFAULT);
169
170         rc = lustre_pack_reply(req, 2, size, NULL);
171         if (rc)
172                 RETURN(rc);
173
174         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
175                                  sizeof(*repbody));
176         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
177         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
178         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
179         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
180         RETURN(0);
181 }
182
183 /*
184  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
185  * lock on the file being truncated.
186  */
187 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
188                               struct lustre_handle *lh)
189 {
190         int flags;
191         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0} };
192         ldlm_policy_data_t policy;
193         __u64 start;
194         __u64 finis;
195
196         ENTRY;
197
198         LASSERT(!lustre_handle_is_used(lh));
199
200         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
201             !(oa->o_flags & OBD_FL_TRUNCLOCK))
202                 RETURN(0);
203
204         CDEBUG(D_INODE, "OST-side truncate lock.\n");
205
206         start = oa->o_size;
207         finis = start + oa->o_blocks;
208
209         /*
210          * standard truncate optimization: if file body is completely
211          * destroyed, don't send data back to the server.
212          */
213         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
214
215         policy.l_extent.start = start & CFS_PAGE_MASK;
216
217         /*
218          * If ->o_blocks is EOF it means "lock till the end of the
219          * file". Otherwise, it's size of a hole being punched (in bytes)
220          */
221         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
222                 policy.l_extent.end = OBD_OBJECT_EOF;
223         else
224                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
225
226         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
227                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
228                                       ldlm_blocking_ast, ldlm_completion_ast,
229                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
230 }
231
232 /*
233  * Helper function for ost_punch(): release lock acquired by
234  * ost_punch_lock_get(), if any.
235  */
236 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
237                                struct lustre_handle *lh)
238 {
239         ENTRY;
240         if (lustre_handle_is_used(lh))
241                 ldlm_lock_decref(lh, LCK_PW);
242         EXIT;
243 }
244
245 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
246                      struct obd_trans_info *oti)
247 {
248         struct obd_info oinfo = { { { 0 } } };
249         struct ost_body *body, *repbody;
250         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
251         struct lustre_handle lh = {0,};
252         ENTRY;
253
254         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
255         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
256
257         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
258                                   lustre_swab_ost_body);
259         if (body == NULL)
260                 RETURN(-EFAULT);
261
262         oinfo.oi_oa = &body->oa;
263         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
264         oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
265
266         if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
267             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
268                 RETURN(-EINVAL);
269
270         rc = lustre_pack_reply(req, 2, size, NULL);
271         if (rc)
272                 RETURN(rc);
273
274         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
275                                  sizeof(*repbody));
276         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
277         if (rc == 0) {
278                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
279                     oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
280                         /*
281                          * If OBD_FL_TRUNCLOCK is the only bit set in
282                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
283                          * through filter_setattr() to filter_iocontrol().
284                          */
285                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
286
287                 if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
288                         oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
289                                                            REQ_REC_OFF + 1);
290                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
291                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
292         }
293         repbody->oa = *oinfo.oi_oa;
294         RETURN(rc);
295 }
296
297 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
298 {
299         struct ost_body *body, *repbody;
300         struct lustre_capa *capa = NULL;
301         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
302         ENTRY;
303
304         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
305                                   lustre_swab_ost_body);
306         if (body == NULL)
307                 RETURN(-EFAULT);
308
309         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
310                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 1);
311
312         rc = lustre_pack_reply(req, 2, size, NULL);
313         if (rc)
314                 RETURN(rc);
315
316         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
317                                  sizeof(*repbody));
318         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
319         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
320                                   repbody->oa.o_blocks, capa);
321         RETURN(0);
322 }
323
324 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
325                        struct obd_trans_info *oti)
326 {
327         struct ost_body *body, *repbody;
328         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
329         struct obd_info oinfo = { { { 0 } } };
330         ENTRY;
331
332         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
333                                   lustre_swab_ost_body);
334         if (body == NULL)
335                 RETURN(-EFAULT);
336
337         rc = lustre_pack_reply(req, 2, size, NULL);
338         if (rc)
339                 RETURN(rc);
340
341         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
342                                  sizeof(*repbody));
343         repbody->oa = body->oa;
344
345         oinfo.oi_oa = &repbody->oa;
346         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
347                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
348                                                    REQ_REC_OFF + 1);
349         req->rq_status = obd_setattr(exp, &oinfo, oti);
350         RETURN(0);
351 }
352
353 static int ost_bulk_timeout(void *data)
354 {
355         ENTRY;
356         /* We don't fail the connection here, because having the export
357          * killed makes the (vital) call to commitrw very sad.
358          */
359         RETURN(1);
360 }
361
362 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
363                                 struct niobuf_remote *rnb, int nrnb,
364                                 struct niobuf_remote **pp_rnbp)
365 {
366         /* Copy a remote niobuf, splitting it into page-sized chunks
367          * and setting ioo[i].ioo_bufcnt accordingly */
368         struct niobuf_remote *pp_rnb;
369         int   i;
370         int   j;
371         int   page;
372         int   rnbidx = 0;
373         int   npages = 0;
374
375         /*
376          * array of sufficient size already preallocated by caller
377          */
378         LASSERT(pp_rnbp != NULL);
379         LASSERT(*pp_rnbp != NULL);
380
381         /* first count and check the number of pages required */
382         for (i = 0; i < nioo; i++)
383                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
384                         obd_off offset = rnb[rnbidx].offset;
385                         obd_off p0 = offset >> CFS_PAGE_SHIFT;
386                         obd_off pn = (offset + rnb[rnbidx].len - 1) >>
387                                      CFS_PAGE_SHIFT;
388
389                         LASSERT(rnbidx < nrnb);
390
391                         npages += (pn + 1 - p0);
392
393                         if (rnb[rnbidx].len == 0) {
394                                 CERROR("zero len BRW: obj %d objid "LPX64
395                                        " buf %u\n", i, ioo[i].ioo_id, j);
396                                 return -EINVAL;
397                         }
398                         if (j > 0 &&
399                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
400                                 CERROR("unordered BRW: obj %d objid "LPX64
401                                        " buf %u offset "LPX64" <= "LPX64"\n",
402                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
403                                        rnb[rnbidx].offset);
404                                 return -EINVAL;
405                         }
406                 }
407
408         LASSERT(rnbidx == nrnb);
409
410         if (npages == nrnb) {       /* all niobufs are for single pages */
411                 *pp_rnbp = rnb;
412                 return npages;
413         }
414
415         pp_rnb = *pp_rnbp;
416
417         /* now do the actual split */
418         page = rnbidx = 0;
419         for (i = 0; i < nioo; i++) {
420                 int  obj_pages = 0;
421
422                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
423                         obd_off off = rnb[rnbidx].offset;
424                         int     nob = rnb[rnbidx].len;
425
426                         LASSERT(rnbidx < nrnb);
427                         do {
428                                 obd_off  poff = off & ~CFS_PAGE_MASK;
429                                 int      pnob = (poff + nob > CFS_PAGE_SIZE) ?
430                                                 PAGE_SIZE - poff : nob;
431
432                                 LASSERT(page < npages);
433                                 pp_rnb[page].len = pnob;
434                                 pp_rnb[page].offset = off;
435                                 pp_rnb[page].flags = rnb[rnbidx].flags;
436
437                                 CDEBUG(0, "   obj %d id "LPX64
438                                        "page %d(%d) "LPX64" for %d, flg %x\n",
439                                        i, ioo[i].ioo_id, obj_pages, page,
440                                        pp_rnb[page].offset, pp_rnb[page].len,
441                                        pp_rnb[page].flags);
442                                 page++;
443                                 obj_pages++;
444
445                                 off += pnob;
446                                 nob -= pnob;
447                         } while (nob > 0);
448                         LASSERT(nob == 0);
449                 }
450                 ioo[i].ioo_bufcnt = obj_pages;
451         }
452         LASSERT(page == npages);
453
454         return npages;
455 }
456
457 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
458 {
459         __u32 cksum = ~0;
460         int i;
461
462         for (i = 0; i < desc->bd_iov_count; i++) {
463                 struct page *page = desc->bd_iov[i].kiov_page;
464                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
465                 char *ptr = kmap(page) + off;
466                 int len = desc->bd_iov[i].kiov_len;
467
468                 /* corrupt the data before we compute the checksum, to
469                  * simulate a client->OST data error */
470                 if (i == 0 &&
471                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE))
472                         memcpy(ptr, "bad3", min(4, len));
473                 cksum = crc32_le(cksum, ptr, len);
474                 /* corrupt the data after we compute the checksum, to
475                  * simulate an OST->client data error */
476                 if (i == 0 && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND))
477                         memcpy(ptr, "bad4", min(4, len));
478                 kunmap(page);
479         }
480
481         return cksum;
482 }
483
484 /*
485  * populate @nio by @nrpages pages from per-thread page pool
486  */
487 static void ost_nio_pages_get(struct ptlrpc_request *req,
488                               struct niobuf_local *nio, int nrpages)
489 {
490         int i;
491         struct ost_thread_local_cache *tls;
492
493         ENTRY;
494
495         LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
496         LASSERT(req != NULL);
497         LASSERT(req->rq_svc_thread != NULL);
498
499         tls = ost_tls(req);
500         LASSERT(tls != NULL);
501
502         memset(nio, 0, nrpages * sizeof *nio);
503         for (i = 0; i < nrpages; ++ i) {
504                 struct page *page;
505
506                 page = tls->page[i];
507                 LASSERT(page != NULL);
508                 POISON_PAGE(page, 0xf1);
509                 nio[i].page = page;
510                 LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
511         }
512         EXIT;
513 }
514
515 /*
516  * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
517  */
518 static void ost_nio_pages_put(struct ptlrpc_request *req,
519                               struct niobuf_local *nio, int nrpages)
520 {
521         int i;
522
523         ENTRY;
524
525         LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
526
527         for (i = 0; i < nrpages; ++ i)
528                 POISON_PAGE(nio[i].page, 0xf2);
529         EXIT;
530 }
531
532 static int ost_brw_lock_get(int mode, struct obd_export *exp,
533                             struct obd_ioobj *obj, struct niobuf_remote *nb,
534                             struct lustre_handle *lh)
535 {
536         int flags                 = 0;
537         int nrbufs                = obj->ioo_bufcnt;
538         struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
539                                                 obj->ioo_gr, 0} };
540         ldlm_policy_data_t policy;
541         int i;
542
543         ENTRY;
544
545         LASSERT(mode == LCK_PR || mode == LCK_PW);
546         LASSERT(!lustre_handle_is_used(lh));
547
548         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
549                 RETURN(0);
550
551         /* EXPENSIVE ASSERTION */
552         for (i = 1; i < nrbufs; i ++)
553                 LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
554                         (nb[i].flags & OBD_BRW_SRVLOCK));
555
556         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
557         policy.l_extent.end   = (nb[nrbufs - 1].offset +
558                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
559
560         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
561                                       LDLM_EXTENT, &policy, mode, &flags,
562                                       ldlm_blocking_ast, ldlm_completion_ast,
563                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
564 }
565
566 static void ost_brw_lock_put(int mode,
567                              struct obd_ioobj *obj, struct niobuf_remote *niob,
568                              struct lustre_handle *lh)
569 {
570         ENTRY;
571         LASSERT(mode == LCK_PR || mode == LCK_PW);
572         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
573                 lustre_handle_is_used(lh));
574         if (lustre_handle_is_used(lh))
575                 ldlm_lock_decref(lh, mode);
576         EXIT;
577 }
578
579 struct ost_prolong_data {
580         struct obd_export *opd_exp;
581         ldlm_policy_data_t opd_policy;
582         ldlm_mode_t opd_mode;
583 };
584
585 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
586 {
587         struct ost_prolong_data *opd = data;
588
589         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
590
591         if (lock->l_req_mode != lock->l_granted_mode) {
592                 /* scan granted locks only */
593                 return LDLM_ITER_STOP;
594         }
595
596         if (lock->l_export != opd->opd_exp) {
597                 /* prolong locks only for given client */
598                 return LDLM_ITER_CONTINUE;
599         }
600
601         if (!(lock->l_granted_mode & opd->opd_mode)) {
602                 /* we aren't interesting in all type of locks */
603                 return LDLM_ITER_CONTINUE;
604         }
605
606         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
607             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
608                 /* the request doesn't cross the lock, skip it */
609                 return LDLM_ITER_CONTINUE;
610         }
611
612         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
613                 /* ignore locks not being cancelled */
614                 return LDLM_ITER_CONTINUE;
615         }
616
617         /* OK. this is a possible lock the user holds doing I/O
618          * let's refresh eviction timer for it */
619         ldlm_refresh_waiting_lock(lock);
620
621         return LDLM_ITER_CONTINUE;
622 }
623
624 static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
625                               struct niobuf_remote *nb, ldlm_mode_t mode)
626 {
627         struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
628                                                 obj->ioo_gr, 0} };
629         int nrbufs = obj->ioo_bufcnt;
630         struct ost_prolong_data opd;
631
632         ENTRY;
633
634         opd.opd_mode = mode;
635         opd.opd_exp = exp;
636         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
637         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
638                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
639
640         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
641                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
642                opd.opd_policy.l_extent.end);
643         ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
644                               ost_prolong_locks_iter, &opd);
645 }
646
647 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
648 {
649         struct ptlrpc_bulk_desc *desc;
650         struct niobuf_remote *remote_nb;
651         struct niobuf_remote *pp_rnb = NULL;
652         struct niobuf_local *local_nb;
653         struct obd_ioobj *ioo;
654         struct ost_body *body, *repbody;
655         struct lustre_capa *capa = NULL;
656         struct l_wait_info lwi;
657         struct lustre_handle lockh = { 0 };
658         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
659         int comms_error = 0, niocount, npages, nob = 0, rc, i;
660         int no_reply = 0;
661         ENTRY;
662
663         req->rq_bulk_read = 1;
664
665         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
666                 GOTO(out, rc = -EIO);
667
668         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
669                          (obd_timeout + 1) / 4);
670
671         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
672                                   lustre_swab_ost_body);
673         if (body == NULL) {
674                 CERROR("Missing/short ost_body\n");
675                 GOTO(out, rc = -EFAULT);
676         }
677
678         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
679                                  lustre_swab_obd_ioobj);
680         if (ioo == NULL) {
681                 CERROR("Missing/short ioobj\n");
682                 GOTO(out, rc = -EFAULT);
683         }
684
685         niocount = ioo->ioo_bufcnt;
686         if (niocount > PTLRPC_MAX_BRW_PAGES) {
687                 DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
688                           niocount);
689                 GOTO(out, rc = -EFAULT);
690         }
691
692         remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
693                                        niocount * sizeof(*remote_nb),
694                                        lustre_swab_niobuf_remote);
695         if (remote_nb == NULL) {
696                 CERROR("Missing/short niobuf\n");
697                 GOTO(out, rc = -EFAULT);
698         }
699         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
700                 for (i = 1; i < niocount; i++)
701                         lustre_swab_niobuf_remote (&remote_nb[i]);
702         }
703
704         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
705                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
706
707         rc = lustre_pack_reply(req, 2, size, NULL);
708         if (rc)
709                 GOTO(out, rc);
710
711         /*
712          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
713          * ost_thread_init().
714          */
715         local_nb = ost_tls(req)->local;
716         pp_rnb   = ost_tls(req)->remote;
717
718         /* FIXME all niobuf splitting should be done in obdfilter if needed */
719         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
720         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
721         if (npages < 0)
722                 GOTO(out, rc = npages);
723
724         LASSERT(npages <= OST_THREAD_POOL_SIZE);
725
726         ost_nio_pages_get(req, local_nb, npages);
727
728         desc = ptlrpc_prep_bulk_exp(req, npages,
729                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
730         if (desc == NULL)
731                 GOTO(out, rc = -ENOMEM);
732
733         rc = ost_brw_lock_get(LCK_PR, req->rq_export, ioo, pp_rnb, &lockh);
734         if (rc != 0)
735                 GOTO(out_bulk, rc);
736
737         /* 
738          * If getting the lock took more time than
739          * client was willing to wait, drop it. b=11330
740          */
741         if (cfs_time_current_sec() > req->rq_arrival_time.tv_sec + obd_timeout || 
742             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
743                 no_reply = 1;
744                 CERROR("Dropping timed-out read from %s because locking"
745                        "object "LPX64" took %ld seconds.\n",
746                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
747                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
748                 goto out_lock;
749         }
750
751         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
752                         ioo, npages, pp_rnb, local_nb, oti, capa);
753         if (rc != 0)
754                 GOTO(out_lock, rc);
755
756         ost_prolong_locks(req->rq_export, ioo, pp_rnb, LCK_PW | LCK_PR);
757
758         nob = 0;
759         for (i = 0; i < npages; i++) {
760                 int page_rc = local_nb[i].rc;
761
762                 if (page_rc < 0) {              /* error */
763                         rc = page_rc;
764                         break;
765                 }
766
767                 LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > "
768                          "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len);
769                 nob += page_rc;
770                 if (page_rc != 0) {             /* some data! */
771                         LASSERT (local_nb[i].page != NULL);
772                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
773                                               pp_rnb[i].offset & ~CFS_PAGE_MASK,
774                                               page_rc);
775                 }
776
777                 if (page_rc != pp_rnb[i].len) { /* short read */
778                         /* All subsequent pages should be 0 */
779                         while(++i < npages)
780                                 LASSERT(local_nb[i].rc == 0);
781                         break;
782                 }
783         }
784
785         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
786                 body->oa.o_cksum = ost_checksum_bulk(desc);
787                 body->oa.o_valid = OBD_MD_FLCKSUM;
788                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
789         } else {
790                 body->oa.o_valid = 0;
791         }
792         /* We're finishing using body->oa as an input variable */
793
794         /* Check if client was evicted while we were doing i/o before touching
795            network */
796         if (rc == 0) {
797                 if (desc->bd_export->exp_failed)
798                         rc = -ENOTCONN;
799                 else {
800                         sptlrpc_svc_wrap_bulk(req, desc);
801
802                         rc = ptlrpc_start_bulk_transfer(desc);
803                 }
804
805                 if (rc == 0) {
806                         lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
807                                                    ost_bulk_timeout, desc);
808                         rc = l_wait_event(desc->bd_waitq,
809                                           !ptlrpc_bulk_active(desc) ||
810                                           desc->bd_export->exp_failed, &lwi);
811                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
812                         if (rc == -ETIMEDOUT) {
813                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
814                                 ptlrpc_abort_bulk(desc);
815                         } else if (desc->bd_export->exp_failed) {
816                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
817                                 rc = -ENOTCONN;
818                                 ptlrpc_abort_bulk(desc);
819                         } else if (!desc->bd_success ||
820                                    desc->bd_nob_transferred != desc->bd_nob) {
821                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
822                                           desc->bd_success ?
823                                           "truncated" : "network error on",
824                                           desc->bd_nob_transferred,
825                                           desc->bd_nob);
826                                 /* XXX should this be a different errno? */
827                                 rc = -ETIMEDOUT;
828                         }
829                 } else {
830                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
831                 }
832                 comms_error = rc != 0;
833         }
834
835         /* Must commit after prep above in all cases */
836         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
837                           ioo, npages, local_nb, oti, rc);
838
839         ost_nio_pages_put(req, local_nb, npages);
840
841         if (rc == 0) {
842                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
843                                          sizeof(*repbody));
844                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
845         }
846
847 out_lock:
848         ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh);
849 out_bulk:
850         ptlrpc_free_bulk(desc);
851         if (no_reply)
852                 RETURN(rc);
853 out:
854         LASSERT(rc <= 0);
855         if (rc == 0) {
856                 req->rq_status = nob;
857                 target_committed_to_req(req);
858                 ptlrpc_reply(req);
859         } else if (!comms_error) {
860                 /* Only reply if there was no comms problem with bulk */
861                 target_committed_to_req(req);
862                 req->rq_status = rc;
863                 ptlrpc_error(req);
864         } else {
865                 if (req->rq_reply_state != NULL) {
866                         /* reply out callback would free */
867                         ptlrpc_rs_decref(req->rq_reply_state);
868                         req->rq_reply_state = NULL;
869                 }
870                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
871                       "client will retry\n",
872                       req->rq_export->exp_obd->obd_name,
873                       req->rq_export->exp_client_uuid.uuid,
874                       req->rq_export->exp_connection->c_remote_uuid.uuid,
875                       libcfs_id2str(req->rq_peer));
876         }
877
878         RETURN(rc);
879 }
880
881 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
882 {
883         struct ptlrpc_bulk_desc *desc;
884         struct niobuf_remote    *remote_nb;
885         struct niobuf_remote    *pp_rnb;
886         struct niobuf_local     *local_nb;
887         struct obd_ioobj        *ioo;
888         struct ost_body         *body, *repbody;
889         struct l_wait_info       lwi;
890         struct lustre_handle     lockh = {0};
891         struct lustre_capa      *capa = NULL;
892         __u32                   *rcs;
893         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
894         int objcount, niocount, npages, comms_error = 0;
895         int rc, swab, i, j;
896         obd_count                client_cksum, server_cksum = 0;
897         int                      no_reply = 0; 
898         ENTRY;
899
900         req->rq_bulk_write = 1;
901
902         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
903                 GOTO(out, rc = -EIO);
904
905         /* pause before transaction has been started */
906         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
907                          (obd_timeout + 1) / 4);
908
909         swab = lustre_msg_swabbed(req->rq_reqmsg);
910         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
911                                   lustre_swab_ost_body);
912         if (body == NULL) {
913                 CERROR("Missing/short ost_body\n");
914                 GOTO(out, rc = -EFAULT);
915         }
916
917         LASSERT_REQSWAB(req, REQ_REC_OFF + 1);
918         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
919                    sizeof(*ioo);
920         if (objcount == 0) {
921                 CERROR("Missing/short ioobj\n");
922                 GOTO(out, rc = -EFAULT);
923         }
924         if (objcount > 1) {
925                 CERROR("too many ioobjs (%d)\n", objcount);
926                 GOTO(out, rc = -EFAULT);
927         }
928
929         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
930                              objcount * sizeof(*ioo));
931         LASSERT (ioo != NULL);
932         for (niocount = i = 0; i < objcount; i++) {
933                 if (swab)
934                         lustre_swab_obd_ioobj(&ioo[i]);
935                 if (ioo[i].ioo_bufcnt == 0) {
936                         CERROR("ioo[%d] has zero bufcnt\n", i);
937                         GOTO(out, rc = -EFAULT);
938                 }
939                 niocount += ioo[i].ioo_bufcnt;
940         }
941
942         if (niocount > PTLRPC_MAX_BRW_PAGES) {
943                 DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
944                           niocount);
945                 GOTO(out, rc = -EFAULT);
946         }
947
948         remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
949                                        niocount * sizeof(*remote_nb),
950                                        lustre_swab_niobuf_remote);
951         if (remote_nb == NULL) {
952                 CERROR("Missing/short niobuf\n");
953                 GOTO(out, rc = -EFAULT);
954         }
955         if (swab) {                             /* swab the remaining niobufs */
956                 for (i = 1; i < niocount; i++)
957                         lustre_swab_niobuf_remote (&remote_nb[i]);
958         }
959
960         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
961                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
962
963         size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
964         rc = lustre_pack_reply(req, 3, size, NULL);
965         if (rc != 0)
966                 GOTO(out, rc);
967         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
968                              niocount * sizeof(*rcs));
969
970         /*
971          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
972          * ost_thread_init().
973          */
974         local_nb = ost_tls(req)->local;
975         pp_rnb   = ost_tls(req)->remote;
976
977         /* FIXME all niobuf splitting should be done in obdfilter if needed */
978         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
979         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
980         if (npages < 0)
981                 GOTO(out, rc = npages);
982
983         LASSERT(npages <= OST_THREAD_POOL_SIZE);
984
985         ost_nio_pages_get(req, local_nb, npages);
986
987         desc = ptlrpc_prep_bulk_exp(req, npages,
988                                      BULK_GET_SINK, OST_BULK_PORTAL);
989         if (desc == NULL)
990                 GOTO(out, rc = -ENOMEM);
991
992         rc = ost_brw_lock_get(LCK_PW, req->rq_export, ioo, pp_rnb, &lockh);
993         if (rc != 0)
994                 GOTO(out_bulk, rc);
995
996         /* 
997          * If getting the lock took more time than
998          * client was willing to wait, drop it. b=11330
999          */
1000         if (cfs_time_current_sec() > req->rq_arrival_time.tv_sec + obd_timeout || 
1001             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1002                 no_reply = 1;
1003                 CERROR("Dropping timed-out write from %s because locking"
1004                        "object "LPX64" took %ld seconds.\n",
1005                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
1006                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
1007                 goto out_lock;
1008         }
1009
1010         ost_prolong_locks(req->rq_export, ioo, pp_rnb, LCK_PW);
1011
1012         /* obd_preprw clobbers oa->valid, so save what we need */
1013         client_cksum = body->oa.o_valid & OBD_MD_FLCKSUM ? body->oa.o_cksum : 0;
1014         
1015         /* Because we already sync grant info with client when reconnect,
1016          * grant info will be cleared for resent req, then fed_grant and 
1017          * total_grant will not be modified in following preprw_write */ 
1018         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1019                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info\n");
1020                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1021         }
1022
1023         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
1024                         ioo, npages, pp_rnb, local_nb, oti, capa);
1025         if (rc != 0)
1026                 GOTO(out_lock, rc);
1027
1028         /* NB Having prepped, we must commit... */
1029
1030         for (i = 0; i < npages; i++)
1031                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1032                                       pp_rnb[i].offset & ~CFS_PAGE_MASK,
1033                                       pp_rnb[i].len);
1034
1035         /* Check if client was evicted while we were doing i/o before touching
1036            network */
1037         if (desc->bd_export->exp_failed)
1038                 rc = -ENOTCONN;
1039         else
1040                 rc = ptlrpc_start_bulk_transfer (desc);
1041         if (rc == 0) {
1042                 lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 2, HZ,
1043                                            ost_bulk_timeout, desc);
1044                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
1045                                   desc->bd_export->exp_failed, &lwi);
1046                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
1047                 if (rc == -ETIMEDOUT) {
1048                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
1049                         ptlrpc_abort_bulk(desc);
1050                 } else if (desc->bd_export->exp_failed) {
1051                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1052                         rc = -ENOTCONN;
1053                         ptlrpc_abort_bulk(desc);
1054                 } else if (!desc->bd_success ||
1055                            desc->bd_nob_transferred != desc->bd_nob) {
1056                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1057                                   desc->bd_success ?
1058                                   "truncated" : "network error on",
1059                                   desc->bd_nob_transferred, desc->bd_nob);
1060                         /* XXX should this be a different errno? */
1061                         rc = -ETIMEDOUT;
1062                 }
1063         } else {
1064                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
1065         }
1066         comms_error = rc != 0;
1067
1068         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1069                                  sizeof(*repbody));
1070         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1071
1072         if (unlikely(client_cksum != 0 && rc == 0)) {
1073                 static int cksum_counter;
1074                 server_cksum = ost_checksum_bulk(desc);
1075                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
1076                 repbody->oa.o_cksum = server_cksum;
1077                 cksum_counter++;
1078                 if (unlikely(client_cksum != server_cksum)) {
1079                         CERROR("client csum %x, server csum %x\n",
1080                                client_cksum, server_cksum);
1081                         cksum_counter = 0;
1082                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1083                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1084                                cksum_counter, libcfs_id2str(req->rq_peer),
1085                                server_cksum);
1086                 }
1087         }
1088
1089         sptlrpc_svc_unwrap_bulk(req, desc);
1090
1091         /* Must commit after prep above in all cases */
1092         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
1093                            objcount, ioo, npages, local_nb, oti, rc);
1094
1095         if (unlikely(client_cksum != server_cksum && rc == 0)) {
1096                 int   new_cksum = ost_checksum_bulk(desc);
1097                 char *msg;
1098                 char *via;
1099                 char *router;
1100
1101                 if (new_cksum == server_cksum)
1102                         msg = "changed in transit before arrival at OST";
1103                 else if (new_cksum == client_cksum)
1104                         msg = "initial checksum before message complete";
1105                 else
1106                         msg = "changed in transit AND after initial checksum";
1107
1108                 if (req->rq_peer.nid == desc->bd_sender) {
1109                         via = router = "";
1110                 } else {
1111                         via = " via ";
1112                         router = libcfs_nid2str(desc->bd_sender);
1113                 }
1114                 
1115                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1116                                    "%s%s%s inum "LPU64"/"LPU64" object "
1117                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1118                                    req->rq_export->exp_obd->obd_name, msg,
1119                                    libcfs_id2str(req->rq_peer),
1120                                    via, router,
1121                                    body->oa.o_valid & OBD_MD_FLFID ?
1122                                                 body->oa.o_fid : (__u64)0,
1123                                    body->oa.o_valid & OBD_MD_FLFID ?
1124                                                 body->oa.o_generation :(__u64)0,
1125                                    body->oa.o_id,
1126                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1127                                                 body->oa.o_gr : (__u64)0,
1128                                    pp_rnb[0].offset,
1129                                    pp_rnb[npages-1].offset+pp_rnb[npages-1].len
1130                                    - 1 );
1131                 CERROR("client csum %x, original server csum %x, "
1132                        "server csum now %x\n",
1133                        client_cksum, server_cksum, new_cksum);
1134         }
1135
1136         ost_nio_pages_put(req, local_nb, npages);
1137
1138         if (rc == 0) {
1139                 /* set per-requested niobuf return codes */
1140                 for (i = j = 0; i < niocount; i++) {
1141                         int nob = remote_nb[i].len;
1142
1143                         rcs[i] = 0;
1144                         do {
1145                                 LASSERT(j < npages);
1146                                 if (local_nb[j].rc < 0)
1147                                         rcs[i] = local_nb[j].rc;
1148                                 nob -= pp_rnb[j].len;
1149                                 j++;
1150                         } while (nob > 0);
1151                         LASSERT(nob == 0);
1152                 }
1153                 LASSERT(j == npages);
1154         }
1155
1156 out_lock:
1157         ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh);
1158 out_bulk:
1159         ptlrpc_free_bulk(desc);
1160         if (no_reply)
1161                 RETURN(rc);
1162 out:
1163         if (rc == 0) {
1164                 oti_to_request(oti, req);
1165                 target_committed_to_req(req);
1166                 rc = ptlrpc_reply(req);
1167         } else if (!comms_error) {
1168                 /* Only reply if there was no comms problem with bulk */
1169                 target_committed_to_req(req);
1170                 req->rq_status = rc;
1171                 ptlrpc_error(req);
1172         } else {
1173                 if (req->rq_reply_state != NULL) {
1174                         /* reply out callback would free */
1175                         ptlrpc_rs_decref(req->rq_reply_state);
1176                         req->rq_reply_state = NULL;
1177                 }
1178                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1179                       "client will retry\n",
1180                       req->rq_export->exp_obd->obd_name,
1181                       req->rq_export->exp_client_uuid.uuid,
1182                       req->rq_export->exp_connection->c_remote_uuid.uuid,
1183                       libcfs_id2str(req->rq_peer));
1184         }
1185         RETURN(rc);
1186 }
1187
1188 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1189 {
1190         char *key, *val = NULL;
1191         int keylen, vallen, rc = 0;
1192         ENTRY;
1193
1194         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1195         if (key == NULL) {
1196                 DEBUG_REQ(D_HA, req, "no set_info key");
1197                 RETURN(-EFAULT);
1198         }
1199         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1200
1201         rc = lustre_pack_reply(req, 1, NULL, NULL);
1202         if (rc)
1203                 RETURN(rc);
1204
1205         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1206         if (vallen)
1207                 val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
1208
1209         if (KEY_IS("evict_by_nid")) {
1210                 if (val && vallen)
1211                         obd_export_evict_by_nid(exp->exp_obd, val);
1212
1213                 GOTO(out, rc = 0);
1214         }
1215
1216         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1217 out:
1218         lustre_msg_set_status(req->rq_repmsg, 0);
1219         RETURN(rc);
1220 }
1221
1222 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1223 {
1224         char *key;
1225         int keylen, rc = 0;
1226         int size[2] = { sizeof(struct ptlrpc_body), sizeof(obd_id) };
1227         obd_id *reply;
1228         ENTRY;
1229
1230         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1231         if (key == NULL) {
1232                 DEBUG_REQ(D_HA, req, "no get_info key");
1233                 RETURN(-EFAULT);
1234         }
1235         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1236
1237         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
1238                 RETURN(-EPROTO);
1239
1240         rc = lustre_pack_reply(req, 2, size, NULL);
1241         if (rc)
1242                 RETURN(rc);
1243
1244         reply = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*reply));
1245         rc = obd_get_info(exp, keylen, key, size, reply);
1246         lustre_msg_set_status(req->rq_repmsg, 0);
1247         RETURN(rc);
1248 }
1249
1250 static int ost_handle_quotactl(struct ptlrpc_request *req)
1251 {
1252         struct obd_quotactl *oqctl, *repoqc;
1253         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1254         ENTRY;
1255
1256         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1257                                    lustre_swab_obd_quotactl);
1258         if (oqctl == NULL)
1259                 GOTO(out, rc = -EPROTO);
1260
1261         rc = lustre_pack_reply(req, 2, size, NULL);
1262         if (rc)
1263                 GOTO(out, rc);
1264
1265         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1266
1267         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1268         *repoqc = *oqctl;
1269 out:
1270         RETURN(rc);
1271 }
1272
1273 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1274 {
1275         struct obd_quotactl *oqctl;
1276         int rc;
1277         ENTRY;
1278
1279         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1280                                    lustre_swab_obd_quotactl);
1281         if (oqctl == NULL)
1282                 RETURN(-EPROTO);
1283
1284         rc = lustre_pack_reply(req, 1, NULL, NULL);
1285         if (rc) {
1286                 CERROR("ost: out of memory while packing quotacheck reply\n");
1287                 RETURN(-ENOMEM);
1288         }
1289
1290         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1291         RETURN(0);
1292 }
1293
1294 static int ost_llog_handle_connect(struct obd_export *exp,
1295                                                    struct ptlrpc_request *req)
1296 {
1297         struct llogd_conn_body *body;
1298         int rc;
1299         ENTRY;
1300
1301         body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
1302         rc = obd_llog_connect(exp, body);
1303         RETURN(rc);
1304 }
1305
1306
1307 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1308                                        struct obd_device *obd, int *process)
1309 {
1310         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1311         case OST_CONNECT: /* This will never get here, but for completeness. */
1312         case OST_DISCONNECT:
1313                *process = 1;
1314                RETURN(0);
1315
1316         case OBD_PING:
1317         case OST_CREATE:
1318         case OST_DESTROY:
1319         case OST_PUNCH:
1320         case OST_SETATTR:
1321         case OST_SYNC:
1322         case OST_WRITE:
1323         case OBD_LOG_CANCEL:
1324         case LDLM_ENQUEUE:
1325                 *process = target_queue_recovery_request(req, obd);
1326                 RETURN(0);
1327
1328         default:
1329                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1330                 *process = -EAGAIN;
1331                 RETURN(0);
1332         }
1333 }
1334
1335 int ost_msg_check_version(struct lustre_msg *msg)
1336 {
1337         int rc;
1338
1339         switch(lustre_msg_get_opc(msg)) {
1340         case OST_CONNECT:
1341         case OST_DISCONNECT:
1342         case OBD_PING:
1343         case SEC_CTX_INIT:
1344         case SEC_CTX_INIT_CONT:
1345         case SEC_CTX_FINI:
1346                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1347                 if (rc)
1348                         CERROR("bad opc %u version %08x, expecting %08x\n",
1349                                lustre_msg_get_opc(msg),
1350                                lustre_msg_get_version(msg),
1351                                LUSTRE_OBD_VERSION);
1352                 break;
1353         case OST_CREATE:
1354         case OST_DESTROY:
1355         case OST_GETATTR:
1356         case OST_SETATTR:
1357         case OST_WRITE:
1358         case OST_READ:
1359         case OST_PUNCH:
1360         case OST_STATFS:
1361         case OST_SYNC:
1362         case OST_SET_INFO:
1363         case OST_GET_INFO:
1364         case OST_QUOTACHECK:
1365         case OST_QUOTACTL:
1366                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1367                 if (rc)
1368                         CERROR("bad opc %u version %08x, expecting %08x\n",
1369                                lustre_msg_get_opc(msg),
1370                                lustre_msg_get_version(msg),
1371                                LUSTRE_OST_VERSION);
1372                 break;
1373         case LDLM_ENQUEUE:
1374         case LDLM_CONVERT:
1375         case LDLM_CANCEL:
1376         case LDLM_BL_CALLBACK:
1377         case LDLM_CP_CALLBACK:
1378                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1379                 if (rc)
1380                         CERROR("bad opc %u version %08x, expecting %08x\n",
1381                                lustre_msg_get_opc(msg),
1382                                lustre_msg_get_version(msg),
1383                                LUSTRE_DLM_VERSION);
1384                 break;
1385         case LLOG_ORIGIN_CONNECT:
1386         case OBD_LOG_CANCEL:
1387                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1388                 if (rc)
1389                         CERROR("bad opc %u version %08x, expecting %08x\n",
1390                                lustre_msg_get_opc(msg),
1391                                lustre_msg_get_version(msg),
1392                                LUSTRE_LOG_VERSION);
1393                 break;
1394         default:
1395                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1396                 rc = -ENOTSUPP;
1397         }
1398         return rc;
1399 }
1400
1401 int ost_handle(struct ptlrpc_request *req)
1402 {
1403         struct obd_trans_info trans_info = { 0, };
1404         struct obd_trans_info *oti = &trans_info;
1405         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1406         struct obd_device *obd = NULL;
1407         ENTRY;
1408
1409         LASSERT(current->journal_info == NULL);
1410
1411         /* primordial rpcs don't affect server recovery */
1412         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1413         case SEC_CTX_INIT:
1414         case SEC_CTX_INIT_CONT:
1415         case SEC_CTX_FINI:
1416                 GOTO(out, rc = 0);
1417         }
1418
1419         /* XXX identical to MDS */
1420         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1421                 int recovering;
1422
1423                 if (req->rq_export == NULL) {
1424                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1425                                lustre_msg_get_opc(req->rq_reqmsg),
1426                                libcfs_id2str(req->rq_peer));
1427                         req->rq_status = -ENOTCONN;
1428                         GOTO(out, rc = -ENOTCONN);
1429                 }
1430
1431                 obd = req->rq_export->exp_obd;
1432
1433                 /* Check for aborted recovery. */
1434                 spin_lock_bh(&obd->obd_processing_task_lock);
1435                 recovering = obd->obd_recovering;
1436                 spin_unlock_bh(&obd->obd_processing_task_lock);
1437                 if (recovering) {
1438                         rc = ost_filter_recovery_request(req, obd,
1439                                                          &should_process);
1440                         if (rc || !should_process)
1441                                 RETURN(rc);
1442                         else if (should_process < 0) {
1443                                 req->rq_status = should_process;
1444                                 rc = ptlrpc_error(req);
1445                                 RETURN(rc);
1446                         }
1447                 }
1448         }
1449
1450         oti_init(oti, req);
1451         
1452         rc = ost_msg_check_version(req->rq_reqmsg);
1453         if (rc)
1454                 RETURN(rc);
1455
1456         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1457         case OST_CONNECT: {
1458                 CDEBUG(D_INODE, "connect\n");
1459                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
1460                 rc = target_handle_connect(req);
1461                 if (!rc)
1462                         obd = req->rq_export->exp_obd;
1463                 break;
1464         }
1465         case OST_DISCONNECT:
1466                 CDEBUG(D_INODE, "disconnect\n");
1467                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
1468                 rc = target_handle_disconnect(req);
1469                 break;
1470         case OST_CREATE:
1471                 CDEBUG(D_INODE, "create\n");
1472                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
1473                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1474                         GOTO(out, rc = -ENOSPC);
1475                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1476                         GOTO(out, rc = -EROFS);
1477                 rc = ost_create(req->rq_export, req, oti);
1478                 break;
1479         case OST_DESTROY:
1480                 CDEBUG(D_INODE, "destroy\n");
1481                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
1482                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1483                         GOTO(out, rc = -EROFS);
1484                 rc = ost_destroy(req->rq_export, req, oti);
1485                 break;
1486         case OST_GETATTR:
1487                 CDEBUG(D_INODE, "getattr\n");
1488                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
1489                 rc = ost_getattr(req->rq_export, req);
1490                 break;
1491         case OST_SETATTR:
1492                 CDEBUG(D_INODE, "setattr\n");
1493                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
1494                 rc = ost_setattr(req->rq_export, req, oti);
1495                 break;
1496         case OST_WRITE:
1497                 CDEBUG(D_INODE, "write\n");
1498                 /* req->rq_request_portal would be nice, if it was set */
1499                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1500                         CERROR("%s: deny write request from %s to portal %u\n",
1501                                req->rq_export->exp_obd->obd_name,
1502                                obd_export_nid2str(req->rq_export),
1503                                req->rq_rqbd->rqbd_service->srv_req_portal);
1504                         GOTO(out, rc = -EPROTO);
1505                 }
1506                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1507                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1508                         GOTO(out, rc = -ENOSPC);
1509                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1510                         GOTO(out, rc = -EROFS);
1511                 rc = ost_brw_write(req, oti);
1512                 LASSERT(current->journal_info == NULL);
1513                 /* ost_brw_write sends its own replies */
1514                 RETURN(rc);
1515         case OST_READ:
1516                 CDEBUG(D_INODE, "read\n");
1517                 /* req->rq_request_portal would be nice, if it was set */
1518                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1519                         CERROR("%s: deny read request from %s to portal %u\n",
1520                                req->rq_export->exp_obd->obd_name,
1521                                obd_export_nid2str(req->rq_export),
1522                                req->rq_rqbd->rqbd_service->srv_req_portal);
1523                         GOTO(out, rc = -EPROTO);
1524                 }
1525                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1526                 rc = ost_brw_read(req, oti);
1527                 LASSERT(current->journal_info == NULL);
1528                 /* ost_brw_read sends its own replies */
1529                 RETURN(rc);
1530         case OST_PUNCH:
1531                 CDEBUG(D_INODE, "punch\n");
1532                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1533                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1534                         GOTO(out, rc = -EROFS);
1535                 rc = ost_punch(req->rq_export, req, oti);
1536                 break;
1537         case OST_STATFS:
1538                 CDEBUG(D_INODE, "statfs\n");
1539                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1540                 rc = ost_statfs(req);
1541                 break;
1542         case OST_SYNC:
1543                 CDEBUG(D_INODE, "sync\n");
1544                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1545                 rc = ost_sync(req->rq_export, req);
1546                 break;
1547         case OST_SET_INFO:
1548                 DEBUG_REQ(D_INODE, req, "set_info");
1549                 rc = ost_set_info(req->rq_export, req);
1550                 break;
1551         case OST_GET_INFO:
1552                 DEBUG_REQ(D_INODE, req, "get_info");
1553                 rc = ost_get_info(req->rq_export, req);
1554                 break;
1555         case OST_QUOTACHECK:
1556                 CDEBUG(D_INODE, "quotacheck\n");
1557                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACHECK_NET, 0);
1558                 rc = ost_handle_quotacheck(req);
1559                 break;
1560         case OST_QUOTACTL:
1561                 CDEBUG(D_INODE, "quotactl\n");
1562                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACTL_NET, 0);
1563                 rc = ost_handle_quotactl(req);
1564                 break;
1565         case OBD_PING:
1566                 DEBUG_REQ(D_INODE, req, "ping");
1567                 rc = target_handle_ping(req);
1568                 break;
1569         /* FIXME - just reply status */
1570         case LLOG_ORIGIN_CONNECT:
1571                 DEBUG_REQ(D_INODE, req, "log connect");
1572                 rc = ost_llog_handle_connect(req->rq_export, req);
1573                 req->rq_status = rc;
1574                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1575                 if (rc)
1576                         RETURN(rc);
1577                 RETURN(ptlrpc_reply(req));
1578         case OBD_LOG_CANCEL:
1579                 CDEBUG(D_INODE, "log cancel\n");
1580                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1581                 rc = llog_origin_handle_cancel(req);
1582                 req->rq_status = rc;
1583                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1584                 if (rc)
1585                         RETURN(rc);
1586                 RETURN(ptlrpc_reply(req));
1587         case LDLM_ENQUEUE:
1588                 CDEBUG(D_INODE, "enqueue\n");
1589                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1590                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1591                                          ldlm_server_blocking_ast,
1592                                          ldlm_server_glimpse_ast);
1593                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1594                 break;
1595         case LDLM_CONVERT:
1596                 CDEBUG(D_INODE, "convert\n");
1597                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1598                 rc = ldlm_handle_convert(req);
1599                 break;
1600         case LDLM_CANCEL:
1601                 CDEBUG(D_INODE, "cancel\n");
1602                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1603                 rc = ldlm_handle_cancel(req);
1604                 break;
1605         case LDLM_BL_CALLBACK:
1606         case LDLM_CP_CALLBACK:
1607                 CDEBUG(D_INODE, "callback\n");
1608                 CERROR("callbacks should not happen on OST\n");
1609                 /* fall through */
1610         default:
1611                 CERROR("Unexpected opcode %d\n",
1612                        lustre_msg_get_opc(req->rq_reqmsg));
1613                 req->rq_status = -ENOTSUPP;
1614                 rc = ptlrpc_error(req);
1615                 RETURN(rc);
1616         }
1617
1618         LASSERT(current->journal_info == NULL);
1619
1620         EXIT;
1621         /* If we're DISCONNECTing, the export_data is already freed */
1622         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
1623                 target_committed_to_req(req);
1624
1625 out:
1626         if (!rc)
1627                 oti_to_request(oti, req);
1628
1629         target_send_reply(req, rc, fail);
1630         return 0;
1631 }
1632 EXPORT_SYMBOL(ost_handle);
1633 /*
1634  * free per-thread pool created by ost_thread_init().
1635  */
1636 static void ost_thread_done(struct ptlrpc_thread *thread)
1637 {
1638         int i;
1639         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
1640                                              * Storage */
1641
1642         ENTRY;
1643
1644         LASSERT(thread != NULL);
1645
1646         /*
1647          * be prepared to handle partially-initialized pools (because this is
1648          * called from ost_thread_init() for cleanup.
1649          */
1650         tls = thread->t_data;
1651         if (tls != NULL) {
1652                 for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
1653                         if (tls->page[i] != NULL)
1654                                 __free_page(tls->page[i]);
1655                 }
1656                 OBD_FREE_PTR(tls);
1657                 thread->t_data = NULL;
1658         }
1659         EXIT;
1660 }
1661
1662 /*
1663  * initialize per-thread page pool (bug 5137).
1664  */
1665 static int ost_thread_init(struct ptlrpc_thread *thread)
1666 {
1667         int result;
1668         int i;
1669         struct ost_thread_local_cache *tls;
1670
1671         ENTRY;
1672
1673         LASSERT(thread != NULL);
1674         LASSERT(thread->t_data == NULL);
1675         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
1676
1677         OBD_ALLOC_PTR(tls);
1678         if (tls != NULL) {
1679                 result = 0;
1680                 thread->t_data = tls;
1681                 /*
1682                  * populate pool
1683                  */
1684                 for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
1685                         tls->page[i] = alloc_page(OST_THREAD_POOL_GFP);
1686                         if (tls->page[i] == NULL) {
1687                                 ost_thread_done(thread);
1688                                 result = -ENOMEM;
1689                                 break;
1690                         }
1691                 }
1692         } else
1693                 result = -ENOMEM;
1694         RETURN(result);
1695 }
1696
1697 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
1698
1699 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
1700 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1701 {
1702         struct ost_obd *ost = &obd->u.ost;
1703         struct lprocfs_static_vars lvars;
1704         int oss_min_threads;
1705         int oss_max_threads;
1706         int rc;
1707         ENTRY;
1708
1709         rc = cleanup_group_info();
1710         if (rc)
1711                 RETURN(rc);
1712
1713         rc = llog_start_commit_thread();
1714         if (rc < 0)
1715                 RETURN(rc);
1716
1717         lprocfs_init_vars(ost, &lvars);
1718         lprocfs_obd_setup(obd, lvars.obd_vars);
1719
1720         sema_init(&ost->ost_health_sem, 1);
1721
1722         if (oss_num_threads) {
1723                 /* If oss_num_threads is set, it is the min and the max. */
1724                 if (oss_num_threads > OSS_THREADS_MAX) 
1725                         oss_num_threads = OSS_THREADS_MAX;
1726                 if (oss_num_threads < OSS_THREADS_MIN)
1727                         oss_num_threads = OSS_THREADS_MIN;
1728                 oss_max_threads = oss_min_threads = oss_num_threads;
1729         } else {
1730                 /* Base min threads on memory and cpus */
1731                 oss_min_threads = smp_num_cpus * num_physpages >> 
1732                         (27 - CFS_PAGE_SHIFT);
1733                 if (oss_min_threads < OSS_THREADS_MIN)
1734                         oss_min_threads = OSS_THREADS_MIN;
1735                 /* Insure a 4x range for dynamic threads */
1736                 if (oss_min_threads > OSS_THREADS_MAX / 4) 
1737                         oss_min_threads = OSS_THREADS_MAX / 4;
1738                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4);
1739         }
1740
1741         ost->ost_service =
1742                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1743                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
1744                                 OSC_REPLY_PORTAL,
1745                                 OST_WATCHDOG_TIMEOUT, ost_handle,
1746                                 LUSTRE_OSS_NAME, obd->obd_proc_entry,
1747                                 ost_print_req, oss_min_threads,
1748                                 oss_max_threads, "ll_ost",
1749                                 LCT_DT_THREAD);
1750         if (ost->ost_service == NULL) {
1751                 CERROR("failed to start service\n");
1752                 GOTO(out_lprocfs, rc = -ENOMEM);
1753         }
1754
1755         rc = ptlrpc_start_threads(obd, ost->ost_service);
1756         if (rc)
1757                 GOTO(out_service, rc = -EINVAL);
1758
1759         ost->ost_create_service =
1760                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1761                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
1762                                 OSC_REPLY_PORTAL,
1763                                 OST_WATCHDOG_TIMEOUT, ost_handle, "ost_create",
1764                                 obd->obd_proc_entry, ost_print_req, 1, 1,
1765                                 "ll_ost_creat", LCT_DT_THREAD);
1766         if (ost->ost_create_service == NULL) {
1767                 CERROR("failed to start OST create service\n");
1768                 GOTO(out_service, rc = -ENOMEM);
1769         }
1770
1771         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
1772         if (rc)
1773                 GOTO(out_create, rc = -EINVAL);
1774
1775         ost->ost_io_service =
1776                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1777                                 OST_MAXREPSIZE, OST_IO_PORTAL,
1778                                 OSC_REPLY_PORTAL,
1779                                 OST_WATCHDOG_TIMEOUT, ost_handle, "ost_io",
1780                                 obd->obd_proc_entry, ost_print_req,
1781                                 oss_min_threads, oss_max_threads,
1782                                 "ll_ost_io", LCT_DT_THREAD);
1783         if (ost->ost_io_service == NULL) {
1784                 CERROR("failed to start OST I/O service\n");
1785                 GOTO(out_create, rc = -ENOMEM);
1786         }
1787
1788         ost->ost_io_service->srv_init = ost_thread_init;
1789         ost->ost_io_service->srv_done = ost_thread_done;
1790         ost->ost_io_service->srv_cpu_affinity = 1;
1791         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
1792         if (rc)
1793                 GOTO(out_io, rc = -EINVAL);
1794
1795         ping_evictor_start();
1796
1797         RETURN(0);
1798
1799 out_io:
1800         ptlrpc_unregister_service(ost->ost_io_service);
1801         ost->ost_io_service = NULL;
1802 out_create:
1803         ptlrpc_unregister_service(ost->ost_create_service);
1804         ost->ost_create_service = NULL;
1805 out_service:
1806         ptlrpc_unregister_service(ost->ost_service);
1807         ost->ost_service = NULL;
1808 out_lprocfs:
1809         lprocfs_obd_cleanup(obd);
1810         RETURN(rc);
1811 }
1812
1813 static int ost_cleanup(struct obd_device *obd)
1814 {
1815         struct ost_obd *ost = &obd->u.ost;
1816         int err = 0;
1817         ENTRY;
1818
1819         ping_evictor_stop();
1820
1821         spin_lock_bh(&obd->obd_processing_task_lock);
1822         if (obd->obd_recovering) {
1823                 target_cancel_recovery_timer(obd);
1824                 obd->obd_recovering = 0;
1825         }
1826         spin_unlock_bh(&obd->obd_processing_task_lock);
1827
1828         down(&ost->ost_health_sem);
1829         ptlrpc_unregister_service(ost->ost_service);
1830         ptlrpc_unregister_service(ost->ost_create_service);
1831         ptlrpc_unregister_service(ost->ost_io_service);
1832         ost->ost_service = NULL;
1833         ost->ost_create_service = NULL;
1834         up(&ost->ost_health_sem);
1835
1836         lprocfs_obd_cleanup(obd);
1837
1838         RETURN(err);
1839 }
1840
1841 static int ost_health_check(struct obd_device *obd)
1842 {
1843         struct ost_obd *ost = &obd->u.ost;
1844         int rc = 0;
1845
1846         down(&ost->ost_health_sem);
1847         rc |= ptlrpc_service_health_check(ost->ost_service);
1848         rc |= ptlrpc_service_health_check(ost->ost_create_service);
1849         rc |= ptlrpc_service_health_check(ost->ost_io_service);
1850         up(&ost->ost_health_sem);
1851
1852         /*
1853          * health_check to return 0 on healthy
1854          * and 1 on unhealthy.
1855          */
1856         if( rc != 0)
1857                 rc = 1;
1858
1859         return rc;
1860 }
1861
1862 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
1863 {
1864         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
1865 }
1866
1867 /* use obd ops to offer management infrastructure */
1868 static struct obd_ops ost_obd_ops = {
1869         .o_owner        = THIS_MODULE,
1870         .o_setup        = ost_setup,
1871         .o_cleanup      = ost_cleanup,
1872         .o_health_check = ost_health_check,
1873 };
1874
1875
1876 static int __init ost_init(void)
1877 {
1878         struct lprocfs_static_vars lvars;
1879         int rc;
1880         ENTRY;
1881
1882         lprocfs_init_vars(ost, &lvars);
1883         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1884                                  LUSTRE_OSS_NAME, NULL);
1885
1886         if (ost_num_threads != 0 && oss_num_threads == 0) {
1887                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
1888                               "use oss_num_threads instead or unset both for "
1889                               "dynamic thread startup\n");
1890                 oss_num_threads = ost_num_threads;
1891         }
1892
1893         RETURN(rc);
1894 }
1895
1896 static void /*__exit*/ ost_exit(void)
1897 {
1898         class_unregister_type(LUSTRE_OSS_NAME);
1899 }
1900
1901 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1902 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1903 MODULE_LICENSE("GPL");
1904
1905 module_init(ost_init);
1906 module_exit(ost_exit);