Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  *
26  *  Storage Target Handling functions
27  *  Lustre Object Server Module (OST)
28  *
29  *  This server is single threaded at present (but can easily be multi
30  *  threaded). For testing and management it is treated as an
31  *  obd_device, although it does not export a full OBD method table
32  *  (the requests are coming in over the wire, so object target
33  *  modules do not have a full method table.)
34  */
35
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_OST
40
41 #include <linux/module.h>
42 #include <obd_ost.h>
43 #include <lustre_net.h>
44 #include <lustre_dlm.h>
45 #include <lustre_export.h>
46 #include <lustre_debug.h>
47 #include <linux/init.h>
48 #include <lprocfs_status.h>
49 #include <lustre_commit_confd.h>
50 #include <libcfs/list.h>
51 #include <lustre_quota.h>
52 #include "ost_internal.h"
53
54 static int oss_num_threads;
55 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
56                 "number of OSS service threads to start");
57
58 static int ost_num_threads;
59 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
60                 "number of OST service threads to start (deprecated)");
61
62 static int oss_num_create_threads;
63 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
64                 "number of OSS create threads to start");
65
66 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
67 {
68         struct oti_req_ack_lock *ack_lock;
69         int i;
70
71         if (oti == NULL)
72                 return;
73
74         if (req->rq_repmsg)
75                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
76         req->rq_transno = oti->oti_transno;
77
78         /* XXX 4 == entries in oti_ack_locks??? */
79         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
80                 if (!ack_lock->mode)
81                         break;
82                 /* XXX not even calling target_send_reply in some cases... */
83                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
84         }
85 }
86
87 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
88                        struct obd_trans_info *oti)
89 {
90         struct ost_body *body, *repbody;
91         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
92         ENTRY;
93
94         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
95                                   lustre_swab_ost_body);
96         if (body == NULL)
97                 RETURN(-EFAULT);
98
99         if (body->oa.o_id == 0)
100                 RETURN(-EPROTO);
101
102         if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
103                 struct ldlm_request *dlm;
104                 dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
105                                          lustre_swab_ldlm_request);
106                 if (dlm == NULL)
107                         RETURN (-EFAULT);
108                 ldlm_request_cancel(req, dlm, 0);
109         }
110         
111         rc = lustre_pack_reply(req, 2, size, NULL);
112         if (rc)
113                 RETURN(rc);
114
115         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
116                 oti->oti_logcookies = obdo_logcookie(&body->oa);
117         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
118                                  sizeof(*repbody));
119         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
120         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
121         RETURN(0);
122 }
123
124 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
125 {
126         struct ost_body *body, *repbody;
127         struct obd_info oinfo = { { { 0 } } };
128         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
129         ENTRY;
130
131         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
132                                   lustre_swab_ost_body);
133         if (body == NULL)
134                 RETURN(-EFAULT);
135
136         rc = lustre_pack_reply(req, 2, size, NULL);
137         if (rc)
138                 RETURN(rc);
139
140         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
141                                  sizeof(*repbody));
142         repbody->oa = body->oa;
143
144         oinfo.oi_oa = &repbody->oa;
145         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
146                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
147                                                    REQ_REC_OFF + 1);
148         req->rq_status = obd_getattr(exp, &oinfo);
149         RETURN(0);
150 }
151
152 static int ost_statfs(struct ptlrpc_request *req)
153 {
154         struct obd_statfs *osfs;
155         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
156         ENTRY;
157
158         rc = lustre_pack_reply(req, 2, size, NULL);
159         if (rc)
160                 RETURN(rc);
161
162         osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
163
164         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
165                                     cfs_time_current_64() - HZ);
166         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
167                 osfs->os_bfree = osfs->os_bavail = 64;
168         if (req->rq_status != 0)
169                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
170
171         RETURN(0);
172 }
173
174 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
175                       struct obd_trans_info *oti)
176 {
177         struct ost_body *body, *repbody;
178         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
179         ENTRY;
180
181         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
182                                   lustre_swab_ost_body);
183         if (body == NULL)
184                 RETURN(-EFAULT);
185
186         rc = lustre_pack_reply(req, 2, size, NULL);
187         if (rc)
188                 RETURN(rc);
189
190         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
191                                  sizeof(*repbody));
192         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
193         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
194         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
195         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
196         RETURN(0);
197 }
198
199 /*
200  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
201  * lock on the file being truncated.
202  */
203 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
204                               struct lustre_handle *lh)
205 {
206         int flags;
207         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0} };
208         ldlm_policy_data_t policy;
209         __u64 start;
210         __u64 finis;
211
212         ENTRY;
213
214         LASSERT(!lustre_handle_is_used(lh));
215
216         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
217             !(oa->o_flags & OBD_FL_TRUNCLOCK))
218                 RETURN(0);
219
220         CDEBUG(D_INODE, "OST-side truncate lock.\n");
221
222         start = oa->o_size;
223         finis = start + oa->o_blocks;
224
225         /*
226          * standard truncate optimization: if file body is completely
227          * destroyed, don't send data back to the server.
228          */
229         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
230
231         policy.l_extent.start = start & CFS_PAGE_MASK;
232
233         /*
234          * If ->o_blocks is EOF it means "lock till the end of the
235          * file". Otherwise, it's size of a hole being punched (in bytes)
236          */
237         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
238                 policy.l_extent.end = OBD_OBJECT_EOF;
239         else
240                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
241
242         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
243                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
244                                       ldlm_blocking_ast, ldlm_completion_ast,
245                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
246 }
247
248 /*
249  * Helper function for ost_punch(): release lock acquired by
250  * ost_punch_lock_get(), if any.
251  */
252 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
253                                struct lustre_handle *lh)
254 {
255         ENTRY;
256         if (lustre_handle_is_used(lh))
257                 ldlm_lock_decref(lh, LCK_PW);
258         EXIT;
259 }
260
261 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
262                      struct obd_trans_info *oti)
263 {
264         struct obd_info oinfo = { { { 0 } } };
265         struct ost_body *body, *repbody;
266         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
267         struct lustre_handle lh = {0,};
268         ENTRY;
269
270         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
271         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
272
273         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
274                                   lustre_swab_ost_body);
275         if (body == NULL)
276                 RETURN(-EFAULT);
277
278         oinfo.oi_oa = &body->oa;
279         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
280         oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
281
282         if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
283             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
284                 RETURN(-EINVAL);
285
286         rc = lustre_pack_reply(req, 2, size, NULL);
287         if (rc)
288                 RETURN(rc);
289
290         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
291                                  sizeof(*repbody));
292         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
293         if (rc == 0) {
294                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
295                     oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
296                         /*
297                          * If OBD_FL_TRUNCLOCK is the only bit set in
298                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
299                          * through filter_setattr() to filter_iocontrol().
300                          */
301                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
302
303                 if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
304                         oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
305                                                            REQ_REC_OFF + 1);
306                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
307                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
308         }
309         repbody->oa = *oinfo.oi_oa;
310         RETURN(rc);
311 }
312
313 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
314 {
315         struct ost_body *body, *repbody;
316         struct lustre_capa *capa = NULL;
317         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
318         ENTRY;
319
320         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
321                                   lustre_swab_ost_body);
322         if (body == NULL)
323                 RETURN(-EFAULT);
324
325         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
326                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 1);
327
328         rc = lustre_pack_reply(req, 2, size, NULL);
329         if (rc)
330                 RETURN(rc);
331
332         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
333                                  sizeof(*repbody));
334         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
335         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
336                                   repbody->oa.o_blocks, capa);
337         RETURN(0);
338 }
339
340 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
341                        struct obd_trans_info *oti)
342 {
343         struct ost_body *body, *repbody;
344         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
345         struct obd_info oinfo = { { { 0 } } };
346         ENTRY;
347
348         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
349                                   lustre_swab_ost_body);
350         if (body == NULL)
351                 RETURN(-EFAULT);
352
353         rc = lustre_pack_reply(req, 2, size, NULL);
354         if (rc)
355                 RETURN(rc);
356
357         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
358                                  sizeof(*repbody));
359         repbody->oa = body->oa;
360
361         oinfo.oi_oa = &repbody->oa;
362         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
363                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
364                                                    REQ_REC_OFF + 1);
365         req->rq_status = obd_setattr(exp, &oinfo, oti);
366         RETURN(0);
367 }
368
369 static int ost_bulk_timeout(void *data)
370 {
371         ENTRY;
372         /* We don't fail the connection here, because having the export
373          * killed makes the (vital) call to commitrw very sad.
374          */
375         RETURN(1);
376 }
377
378 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
379                                 struct niobuf_remote *rnb, int nrnb,
380                                 struct niobuf_remote **pp_rnbp)
381 {
382         /* Copy a remote niobuf, splitting it into page-sized chunks
383          * and setting ioo[i].ioo_bufcnt accordingly */
384         struct niobuf_remote *pp_rnb;
385         int   i;
386         int   j;
387         int   page;
388         int   rnbidx = 0;
389         int   npages = 0;
390
391         /*
392          * array of sufficient size already preallocated by caller
393          */
394         LASSERT(pp_rnbp != NULL);
395         LASSERT(*pp_rnbp != NULL);
396
397         /* first count and check the number of pages required */
398         for (i = 0; i < nioo; i++)
399                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
400                         obd_off offset = rnb[rnbidx].offset;
401                         obd_off p0 = offset >> CFS_PAGE_SHIFT;
402                         obd_off pn = (offset + rnb[rnbidx].len - 1) >>
403                                      CFS_PAGE_SHIFT;
404
405                         LASSERT(rnbidx < nrnb);
406
407                         npages += (pn + 1 - p0);
408
409                         if (rnb[rnbidx].len == 0) {
410                                 CERROR("zero len BRW: obj %d objid "LPX64
411                                        " buf %u\n", i, ioo[i].ioo_id, j);
412                                 return -EINVAL;
413                         }
414                         if (j > 0 &&
415                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
416                                 CERROR("unordered BRW: obj %d objid "LPX64
417                                        " buf %u offset "LPX64" <= "LPX64"\n",
418                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
419                                        rnb[rnbidx].offset);
420                                 return -EINVAL;
421                         }
422                 }
423
424         LASSERT(rnbidx == nrnb);
425
426         if (npages == nrnb) {       /* all niobufs are for single pages */
427                 *pp_rnbp = rnb;
428                 return npages;
429         }
430
431         pp_rnb = *pp_rnbp;
432
433         /* now do the actual split */
434         page = rnbidx = 0;
435         for (i = 0; i < nioo; i++) {
436                 int  obj_pages = 0;
437
438                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
439                         obd_off off = rnb[rnbidx].offset;
440                         int     nob = rnb[rnbidx].len;
441
442                         LASSERT(rnbidx < nrnb);
443                         do {
444                                 obd_off  poff = off & ~CFS_PAGE_MASK;
445                                 int      pnob = (poff + nob > CFS_PAGE_SIZE) ?
446                                                 PAGE_SIZE - poff : nob;
447
448                                 LASSERT(page < npages);
449                                 pp_rnb[page].len = pnob;
450                                 pp_rnb[page].offset = off;
451                                 pp_rnb[page].flags = rnb[rnbidx].flags;
452
453                                 CDEBUG(0, "   obj %d id "LPX64
454                                        "page %d(%d) "LPX64" for %d, flg %x\n",
455                                        i, ioo[i].ioo_id, obj_pages, page,
456                                        pp_rnb[page].offset, pp_rnb[page].len,
457                                        pp_rnb[page].flags);
458                                 page++;
459                                 obj_pages++;
460
461                                 off += pnob;
462                                 nob -= pnob;
463                         } while (nob > 0);
464                         LASSERT(nob == 0);
465                 }
466                 ioo[i].ioo_bufcnt = obj_pages;
467         }
468         LASSERT(page == npages);
469
470         return npages;
471 }
472
473 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
474                                cksum_type_t cksum_type)
475 {
476         __u32 cksum;
477         int i;
478
479         cksum = init_checksum(cksum_type);
480         for (i = 0; i < desc->bd_iov_count; i++) {
481                 struct page *page = desc->bd_iov[i].kiov_page;
482                 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
483                 char *ptr = kmap(page) + off;
484                 int len = desc->bd_iov[i].kiov_len;
485
486                 /* corrupt the data before we compute the checksum, to
487                  * simulate a client->OST data error */
488                 if (i == 0 && opc == OST_WRITE &&
489                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE))
490                         memcpy(ptr, "bad3", min(4, len));
491                 cksum = compute_checksum(cksum, ptr, len, cksum_type);
492                 /* corrupt the data after we compute the checksum, to
493                  * simulate an OST->client data error */
494                 if (i == 0 && opc == OST_READ &&
495                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND))
496                         memcpy(ptr, "bad4", min(4, len));
497                 kunmap(page);
498         }
499
500         return cksum;
501 }
502
503 /*
504  * populate @nio by @nrpages pages from per-thread page pool
505  */
506 static void ost_nio_pages_get(struct ptlrpc_request *req,
507                               struct niobuf_local *nio, int nrpages)
508 {
509         int i;
510         struct ost_thread_local_cache *tls;
511
512         ENTRY;
513
514         LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
515         LASSERT(req != NULL);
516         LASSERT(req->rq_svc_thread != NULL);
517
518         tls = ost_tls(req);
519         LASSERT(tls != NULL);
520
521         memset(nio, 0, nrpages * sizeof *nio);
522         for (i = 0; i < nrpages; ++ i) {
523                 struct page *page;
524
525                 page = tls->page[i];
526                 LASSERT(page != NULL);
527                 POISON_PAGE(page, 0xf1);
528                 nio[i].page = page;
529                 LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
530         }
531         EXIT;
532 }
533
534 /*
535  * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
536  */
537 static void ost_nio_pages_put(struct ptlrpc_request *req,
538                               struct niobuf_local *nio, int nrpages)
539 {
540         int i;
541
542         ENTRY;
543
544         LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
545
546         for (i = 0; i < nrpages; ++ i)
547                 POISON_PAGE(nio[i].page, 0xf2);
548         EXIT;
549 }
550
551 static int ost_brw_lock_get(int mode, struct obd_export *exp,
552                             struct obd_ioobj *obj, struct niobuf_remote *nb,
553                             struct lustre_handle *lh)
554 {
555         int flags                 = 0;
556         int nrbufs                = obj->ioo_bufcnt;
557         struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
558                                                 obj->ioo_gr, 0} };
559         ldlm_policy_data_t policy;
560         int i;
561
562         ENTRY;
563
564         LASSERT(mode == LCK_PR || mode == LCK_PW);
565         LASSERT(!lustre_handle_is_used(lh));
566
567         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
568                 RETURN(0);
569
570         /* EXPENSIVE ASSERTION */
571         for (i = 1; i < nrbufs; i ++)
572                 LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
573                         (nb[i].flags & OBD_BRW_SRVLOCK));
574
575         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
576         policy.l_extent.end   = (nb[nrbufs - 1].offset +
577                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
578
579         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
580                                       LDLM_EXTENT, &policy, mode, &flags,
581                                       ldlm_blocking_ast, ldlm_completion_ast,
582                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
583 }
584
585 static void ost_brw_lock_put(int mode,
586                              struct obd_ioobj *obj, struct niobuf_remote *niob,
587                              struct lustre_handle *lh)
588 {
589         ENTRY;
590         LASSERT(mode == LCK_PR || mode == LCK_PW);
591         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
592                 lustre_handle_is_used(lh));
593         if (lustre_handle_is_used(lh))
594                 ldlm_lock_decref(lh, mode);
595         EXIT;
596 }
597
598 struct ost_prolong_data {
599         struct obd_export *opd_exp;
600         ldlm_policy_data_t opd_policy;
601         ldlm_mode_t opd_mode;
602 };
603
604 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
605 {
606         struct ost_prolong_data *opd = data;
607
608         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
609
610         if (lock->l_req_mode != lock->l_granted_mode) {
611                 /* scan granted locks only */
612                 return LDLM_ITER_STOP;
613         }
614
615         if (lock->l_export != opd->opd_exp) {
616                 /* prolong locks only for given client */
617                 return LDLM_ITER_CONTINUE;
618         }
619
620         if (!(lock->l_granted_mode & opd->opd_mode)) {
621                 /* we aren't interesting in all type of locks */
622                 return LDLM_ITER_CONTINUE;
623         }
624
625         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
626             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
627                 /* the request doesn't cross the lock, skip it */
628                 return LDLM_ITER_CONTINUE;
629         }
630
631         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
632                 /* ignore locks not being cancelled */
633                 return LDLM_ITER_CONTINUE;
634         }
635
636         /* OK. this is a possible lock the user holds doing I/O
637          * let's refresh eviction timer for it */
638         ldlm_refresh_waiting_lock(lock);
639
640         return LDLM_ITER_CONTINUE;
641 }
642
643 static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
644                               struct niobuf_remote *nb, ldlm_mode_t mode)
645 {
646         struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
647                                                 obj->ioo_gr, 0} };
648         int nrbufs = obj->ioo_bufcnt;
649         struct ost_prolong_data opd;
650
651         ENTRY;
652
653         opd.opd_mode = mode;
654         opd.opd_exp = exp;
655         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
656         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
657                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
658
659         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
660                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
661                opd.opd_policy.l_extent.end);
662         ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
663                               ost_prolong_locks_iter, &opd);
664 }
665
666 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
667 {
668         struct ptlrpc_bulk_desc *desc;
669         struct obd_export *exp = req->rq_export;
670         struct niobuf_remote *remote_nb;
671         struct niobuf_remote *pp_rnb = NULL;
672         struct niobuf_local *local_nb;
673         struct obd_ioobj *ioo;
674         struct ost_body *body, *repbody;
675         struct lustre_capa *capa = NULL;
676         struct l_wait_info lwi;
677         struct lustre_handle lockh = { 0 };
678         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
679         int niocount, npages, nob = 0, rc, i;
680         int no_reply = 0;
681         ENTRY;
682
683         req->rq_bulk_read = 1;
684
685         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
686                 GOTO(out, rc = -EIO);
687
688         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
689
690         /* Check if there is eviction in progress, and if so, wait for it to
691          * finish */
692         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
693                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
694                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
695                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
696                         &lwi);
697         }
698         if (exp->exp_failed)
699                 GOTO(out, rc = -ENOTCONN);
700
701         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
702                                   lustre_swab_ost_body);
703         if (body == NULL) {
704                 CERROR("Missing/short ost_body\n");
705                 GOTO(out, rc = -EFAULT);
706         }
707
708         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
709                                  lustre_swab_obd_ioobj);
710         if (ioo == NULL) {
711                 CERROR("Missing/short ioobj\n");
712                 GOTO(out, rc = -EFAULT);
713         }
714
715         niocount = ioo->ioo_bufcnt;
716         if (niocount > PTLRPC_MAX_BRW_PAGES) {
717                 DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
718                           niocount);
719                 GOTO(out, rc = -EFAULT);
720         }
721
722         remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
723                                        niocount * sizeof(*remote_nb),
724                                        lustre_swab_niobuf_remote);
725         if (remote_nb == NULL) {
726                 CERROR("Missing/short niobuf\n");
727                 GOTO(out, rc = -EFAULT);
728         }
729         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
730                 for (i = 1; i < niocount; i++)
731                         lustre_swab_niobuf_remote (&remote_nb[i]);
732         }
733
734         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
735                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
736
737         rc = lustre_pack_reply(req, 2, size, NULL);
738         if (rc)
739                 GOTO(out, rc);
740
741         /*
742          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
743          * ost_thread_init().
744          */
745         local_nb = ost_tls(req)->local;
746         pp_rnb   = ost_tls(req)->remote;
747
748         /* FIXME all niobuf splitting should be done in obdfilter if needed */
749         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
750         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
751         if (npages < 0)
752                 GOTO(out, rc = npages);
753
754         LASSERT(npages <= OST_THREAD_POOL_SIZE);
755
756         ost_nio_pages_get(req, local_nb, npages);
757
758         desc = ptlrpc_prep_bulk_exp(req, npages,
759                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
760         if (desc == NULL)
761                 GOTO(out, rc = -ENOMEM);
762
763         rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh);
764         if (rc != 0)
765                 GOTO(out_bulk, rc);
766
767         /* 
768          * If getting the lock took more time than
769          * client was willing to wait, drop it. b=11330
770          */
771         if (cfs_time_current_sec() > req->rq_arrival_time.tv_sec + obd_timeout || 
772             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
773                 no_reply = 1;
774                 CERROR("Dropping timed-out read from %s because locking"
775                        "object "LPX64" took %ld seconds.\n",
776                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
777                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
778                 GOTO(out_lock, rc = -ETIMEDOUT);
779         }
780
781         rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1,
782                         ioo, npages, pp_rnb, local_nb, oti, capa);
783         if (rc != 0)
784                 GOTO(out_lock, rc);
785
786         ost_prolong_locks(exp, ioo, pp_rnb, LCK_PW | LCK_PR);
787
788         nob = 0;
789         for (i = 0; i < npages; i++) {
790                 int page_rc = local_nb[i].rc;
791
792                 if (page_rc < 0) {              /* error */
793                         rc = page_rc;
794                         break;
795                 }
796
797                 LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > "
798                          "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len);
799                 nob += page_rc;
800                 if (page_rc != 0) {             /* some data! */
801                         LASSERT (local_nb[i].page != NULL);
802                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
803                                               pp_rnb[i].offset & ~CFS_PAGE_MASK,
804                                               page_rc);
805                 }
806
807                 if (page_rc != pp_rnb[i].len) { /* short read */
808                         /* All subsequent pages should be 0 */
809                         while(++i < npages)
810                                 LASSERT(local_nb[i].rc == 0);
811                         break;
812                 }
813         }
814
815         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
816                 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
817
818                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
819                         cksum_type = cksum_type_unpack(body->oa.o_flags);
820                 body->oa.o_flags = cksum_type_pack(cksum_type);
821                 body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
822                 body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);
823                 CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);
824         } else {
825                 body->oa.o_valid = 0;
826         }
827         /* We're finishing using body->oa as an input variable */
828
829         /* Check if client was evicted while we were doing i/o before touching
830            network */
831         if (rc == 0) {
832                 /* Check if there is eviction in progress, and if so, wait for
833                  * it to finish */
834                 if (unlikely(atomic_read(&exp->exp_obd->
835                                                 obd_evict_inprogress))) {
836                         lwi = LWI_INTR(NULL, NULL);
837                         rc = l_wait_event(exp->exp_obd->
838                                                 obd_evict_inprogress_waitq,
839                                           !atomic_read(&exp->exp_obd->
840                                                         obd_evict_inprogress),
841                                           &lwi);
842                 }
843                 if (exp->exp_failed)
844                         rc = -ENOTCONN;
845                 else {
846                         sptlrpc_svc_wrap_bulk(req, desc);
847
848                         rc = ptlrpc_start_bulk_transfer(desc);
849                 }
850
851                 if (rc == 0) {
852                         lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
853                                                    ost_bulk_timeout, desc);
854                         rc = l_wait_event(desc->bd_waitq,
855                                           !ptlrpc_bulk_active(desc) ||
856                                           exp->exp_failed, &lwi);
857                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
858                         if (rc == -ETIMEDOUT) {
859                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
860                                 ptlrpc_abort_bulk(desc);
861                         } else if (exp->exp_failed) {
862                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
863                                 rc = -ENOTCONN;
864                                 ptlrpc_abort_bulk(desc);
865                         } else if (!desc->bd_success ||
866                                    desc->bd_nob_transferred != desc->bd_nob) {
867                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
868                                           desc->bd_success ?
869                                           "truncated" : "network error on",
870                                           desc->bd_nob_transferred,
871                                           desc->bd_nob);
872                                 /* XXX should this be a different errno? */
873                                 rc = -ETIMEDOUT;
874                         }
875                 } else {
876                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);
877                 }
878                 no_reply = rc != 0;
879         }
880
881         /* Must commit after prep above in all cases */
882         rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1,
883                           ioo, npages, local_nb, oti, rc);
884
885         ost_nio_pages_put(req, local_nb, npages);
886
887         if (rc == 0) {
888                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
889                                          sizeof(*repbody));
890                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
891         }
892
893 out_lock:
894         ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh);
895 out_bulk:
896         ptlrpc_free_bulk(desc);
897 out:
898         LASSERT(rc <= 0);
899         if (rc == 0) {
900                 req->rq_status = nob;
901                 target_committed_to_req(req);
902                 ptlrpc_reply(req);
903         } else if (!no_reply) {
904                 /* Only reply if there was no comms problem with bulk */
905                 target_committed_to_req(req);
906                 req->rq_status = rc;
907                 ptlrpc_error(req);
908         } else {
909                 if (req->rq_reply_state != NULL) {
910                         /* reply out callback would free */
911                         ptlrpc_rs_decref(req->rq_reply_state);
912                         req->rq_reply_state = NULL;
913                 }
914                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
915                       "client will retry\n",
916                       exp->exp_obd->obd_name,
917                       exp->exp_client_uuid.uuid,
918                       exp->exp_connection->c_remote_uuid.uuid,
919                       libcfs_id2str(req->rq_peer));
920         }
921
922         RETURN(rc);
923 }
924
925 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
926 {
927         struct ptlrpc_bulk_desc *desc;
928         struct obd_export       *exp = req->rq_export;
929         struct niobuf_remote    *remote_nb;
930         struct niobuf_remote    *pp_rnb;
931         struct niobuf_local     *local_nb;
932         struct obd_ioobj        *ioo;
933         struct ost_body         *body, *repbody;
934         struct l_wait_info       lwi;
935         struct lustre_handle     lockh = {0};
936         struct lustre_capa      *capa = NULL;
937         __u32                   *rcs;
938         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
939         int objcount, niocount, npages;
940         int rc, swab, i, j;
941         obd_count                client_cksum = 0, server_cksum = 0;
942         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
943         int                      no_reply = 0; 
944         ENTRY;
945
946         req->rq_bulk_write = 1;
947
948         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
949                 GOTO(out, rc = -EIO);
950         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
951                 GOTO(out, rc = -EFAULT);
952
953         /* pause before transaction has been started */
954         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
955
956         /* Check if there is eviction in progress, and if so, wait for it to
957          * finish */
958         if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
959                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
960                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
961                         !atomic_read(&exp->exp_obd->obd_evict_inprogress),
962                         &lwi);
963         }
964         if (exp->exp_failed)
965                 GOTO(out, rc = -ENOTCONN);
966
967         swab = lustre_msg_swabbed(req->rq_reqmsg);
968         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
969                                   lustre_swab_ost_body);
970         if (body == NULL) {
971                 CERROR("Missing/short ost_body\n");
972                 GOTO(out, rc = -EFAULT);
973         }
974
975         lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
976         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
977                    sizeof(*ioo);
978         if (objcount == 0) {
979                 CERROR("Missing/short ioobj\n");
980                 GOTO(out, rc = -EFAULT);
981         }
982         if (objcount > 1) {
983                 CERROR("too many ioobjs (%d)\n", objcount);
984                 GOTO(out, rc = -EFAULT);
985         }
986
987         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
988                              objcount * sizeof(*ioo));
989         LASSERT (ioo != NULL);
990         for (niocount = i = 0; i < objcount; i++) {
991                 if (swab)
992                         lustre_swab_obd_ioobj(&ioo[i]);
993                 if (ioo[i].ioo_bufcnt == 0) {
994                         CERROR("ioo[%d] has zero bufcnt\n", i);
995                         GOTO(out, rc = -EFAULT);
996                 }
997                 niocount += ioo[i].ioo_bufcnt;
998         }
999
1000         if (niocount > PTLRPC_MAX_BRW_PAGES) {
1001                 DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
1002                           niocount);
1003                 GOTO(out, rc = -EFAULT);
1004         }
1005
1006         remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
1007                                        niocount * sizeof(*remote_nb),
1008                                        lustre_swab_niobuf_remote);
1009         if (remote_nb == NULL) {
1010                 CERROR("Missing/short niobuf\n");
1011                 GOTO(out, rc = -EFAULT);
1012         }
1013         if (swab) {                             /* swab the remaining niobufs */
1014                 for (i = 1; i < niocount; i++)
1015                         lustre_swab_niobuf_remote (&remote_nb[i]);
1016         }
1017
1018         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1019                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
1020
1021         size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
1022         rc = lustre_pack_reply(req, 3, size, NULL);
1023         if (rc != 0)
1024                 GOTO(out, rc);
1025         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
1026                              niocount * sizeof(*rcs));
1027
1028         /*
1029          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
1030          * ost_thread_init().
1031          */
1032         local_nb = ost_tls(req)->local;
1033         pp_rnb   = ost_tls(req)->remote;
1034
1035         /* FIXME all niobuf splitting should be done in obdfilter if needed */
1036         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
1037         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
1038         if (npages < 0)
1039                 GOTO(out, rc = npages);
1040
1041         LASSERT(npages <= OST_THREAD_POOL_SIZE);
1042
1043         ost_nio_pages_get(req, local_nb, npages);
1044
1045         desc = ptlrpc_prep_bulk_exp(req, npages,
1046                                      BULK_GET_SINK, OST_BULK_PORTAL);
1047         if (desc == NULL)
1048                 GOTO(out, rc = -ENOMEM);
1049
1050         rc = ost_brw_lock_get(LCK_PW, exp, ioo, pp_rnb, &lockh);
1051         if (rc != 0)
1052                 GOTO(out_bulk, rc);
1053
1054         /* 
1055          * If getting the lock took more time than
1056          * client was willing to wait, drop it. b=11330
1057          */
1058         if (cfs_time_current_sec() > req->rq_arrival_time.tv_sec + obd_timeout || 
1059             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
1060                 no_reply = 1;
1061                 CERROR("Dropping timed-out write from %s because locking"
1062                        "object "LPX64" took %ld seconds.\n",
1063                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
1064                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
1065                 GOTO(out_lock, rc = -ETIMEDOUT);
1066         }
1067
1068         ost_prolong_locks(exp, ioo, pp_rnb, LCK_PW);
1069
1070         /* obd_preprw clobbers oa->valid, so save what we need */
1071         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1072                 client_cksum = body->oa.o_cksum;
1073                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1074                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1075         }
1076         
1077         /* Because we already sync grant info with client when reconnect,
1078          * grant info will be cleared for resent req, then fed_grant and 
1079          * total_grant will not be modified in following preprw_write */ 
1080         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
1081                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
1082                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
1083         }
1084
1085         rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
1086                         ioo, npages, pp_rnb, local_nb, oti, capa);
1087         if (rc != 0)
1088                 GOTO(out_lock, rc);
1089
1090         /* NB Having prepped, we must commit... */
1091
1092         for (i = 0; i < npages; i++)
1093                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1094                                       pp_rnb[i].offset & ~CFS_PAGE_MASK,
1095                                       pp_rnb[i].len);
1096
1097         /* Check if client was evicted while we were doing i/o before touching
1098            network */
1099         if (desc->bd_export->exp_failed)
1100                 rc = -ENOTCONN;
1101         else
1102                 rc = ptlrpc_start_bulk_transfer (desc);
1103         if (rc == 0) {
1104                 lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 2, HZ,
1105                                            ost_bulk_timeout, desc);
1106                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
1107                                   desc->bd_export->exp_failed, &lwi);
1108                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
1109                 if (rc == -ETIMEDOUT) {
1110                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
1111                         ptlrpc_abort_bulk(desc);
1112                 } else if (desc->bd_export->exp_failed) {
1113                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1114                         rc = -ENOTCONN;
1115                         ptlrpc_abort_bulk(desc);
1116                 } else if (!desc->bd_success ||
1117                            desc->bd_nob_transferred != desc->bd_nob) {
1118                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1119                                   desc->bd_success ?
1120                                   "truncated" : "network error on",
1121                                   desc->bd_nob_transferred, desc->bd_nob);
1122                         /* XXX should this be a different errno? */
1123                         rc = -ETIMEDOUT;
1124                 }
1125         } else {
1126                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
1127         }
1128         no_reply = rc != 0;
1129
1130         if (rc == 0)
1131                 sptlrpc_svc_unwrap_bulk(req, desc);
1132
1133         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1134                                  sizeof(*repbody));
1135         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1136
1137         if (unlikely(client_cksum != 0 && rc == 0)) {
1138                 static int cksum_counter;
1139                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1141                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1142                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1143                 repbody->oa.o_cksum = server_cksum;
1144                 cksum_counter++;
1145                 if (unlikely(client_cksum != server_cksum)) {
1146                         CERROR("client csum %x, server csum %x\n",
1147                                client_cksum, server_cksum);
1148                         cksum_counter = 0;
1149                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1150                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1151                                cksum_counter, libcfs_id2str(req->rq_peer),
1152                                server_cksum);
1153                 }
1154         }
1155
1156         /* Must commit after prep above in all cases */
1157         rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa,
1158                            objcount, ioo, npages, local_nb, oti, rc);
1159
1160         if (unlikely(client_cksum != server_cksum && rc == 0)) {
1161                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1162                 char *msg;
1163                 char *via;
1164                 char *router;
1165
1166                 if (new_cksum == server_cksum)
1167                         msg = "changed in transit before arrival at OST";
1168                 else if (new_cksum == client_cksum)
1169                         msg = "initial checksum before message complete";
1170                 else
1171                         msg = "changed in transit AND after initial checksum";
1172
1173                 if (req->rq_peer.nid == desc->bd_sender) {
1174                         via = router = "";
1175                 } else {
1176                         via = " via ";
1177                         router = libcfs_nid2str(desc->bd_sender);
1178                 }
1179                 
1180                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1181                                    "%s%s%s inum "LPU64"/"LPU64" object "
1182                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1183                                    exp->exp_obd->obd_name, msg,
1184                                    libcfs_id2str(req->rq_peer),
1185                                    via, router,
1186                                    body->oa.o_valid & OBD_MD_FLFID ?
1187                                                 body->oa.o_fid : (__u64)0,
1188                                    body->oa.o_valid & OBD_MD_FLFID ?
1189                                                 body->oa.o_generation :(__u64)0,
1190                                    body->oa.o_id,
1191                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1192                                                 body->oa.o_gr : (__u64)0,
1193                                    pp_rnb[0].offset,
1194                                    pp_rnb[npages-1].offset+pp_rnb[npages-1].len
1195                                    - 1 );
1196                 CERROR("client csum %x, original server csum %x, "
1197                        "server csum now %x\n",
1198                        client_cksum, server_cksum, new_cksum);
1199         }
1200
1201         ost_nio_pages_put(req, local_nb, npages);
1202
1203         if (rc == 0) {
1204                 /* set per-requested niobuf return codes */
1205                 for (i = j = 0; i < niocount; i++) {
1206                         int nob = remote_nb[i].len;
1207
1208                         rcs[i] = 0;
1209                         do {
1210                                 LASSERT(j < npages);
1211                                 if (local_nb[j].rc < 0)
1212                                         rcs[i] = local_nb[j].rc;
1213                                 nob -= pp_rnb[j].len;
1214                                 j++;
1215                         } while (nob > 0);
1216                         LASSERT(nob == 0);
1217                 }
1218                 LASSERT(j == npages);
1219         }
1220
1221 out_lock:
1222         ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh);
1223 out_bulk:
1224         ptlrpc_free_bulk(desc);
1225 out:
1226         if (rc == 0) {
1227                 oti_to_request(oti, req);
1228                 target_committed_to_req(req);
1229                 rc = ptlrpc_reply(req);
1230         } else if (!no_reply) {
1231                 /* Only reply if there was no comms problem with bulk */
1232                 target_committed_to_req(req);
1233                 req->rq_status = rc;
1234                 ptlrpc_error(req);
1235         } else {
1236                 if (req->rq_reply_state != NULL) {
1237                         /* reply out callback would free */
1238                         ptlrpc_rs_decref(req->rq_reply_state);
1239                         req->rq_reply_state = NULL;
1240                 }
1241                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1242                       "client will retry\n",
1243                       exp->exp_obd->obd_name,
1244                       exp->exp_client_uuid.uuid,
1245                       exp->exp_connection->c_remote_uuid.uuid,
1246                       libcfs_id2str(req->rq_peer));
1247         }
1248         RETURN(rc);
1249 }
1250
1251 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1252 {
1253         char *key, *val = NULL;
1254         int keylen, vallen, rc = 0;
1255         ENTRY;
1256
1257         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1258         if (key == NULL) {
1259                 DEBUG_REQ(D_HA, req, "no set_info key");
1260                 RETURN(-EFAULT);
1261         }
1262         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1263
1264         rc = lustre_pack_reply(req, 1, NULL, NULL);
1265         if (rc)
1266                 RETURN(rc);
1267
1268         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1269         if (vallen)
1270                 val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
1271
1272         if (KEY_IS("evict_by_nid")) {
1273                 if (val && vallen)
1274                         obd_export_evict_by_nid(exp->exp_obd, val);
1275
1276                 GOTO(out, rc = 0);
1277         }
1278
1279         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1280 out:
1281         lustre_msg_set_status(req->rq_repmsg, 0);
1282         RETURN(rc);
1283 }
1284
1285 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1286 {
1287         void *key, *reply;
1288         int keylen, replylen, rc = 0;
1289         struct req_capsule *pill = &req->rq_pill;
1290         ENTRY;
1291
1292         req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
1293
1294         /* this common part for get_info rpc */
1295         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1296         if (key == NULL) {
1297                 DEBUG_REQ(D_HA, req, "no get_info key");
1298                 RETURN(-EFAULT);
1299         }
1300         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1301
1302         rc = obd_get_info(exp, keylen, key, &replylen, NULL);
1303         if (rc)
1304                 RETURN(rc);
1305
1306         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1307                              RCL_SERVER, replylen);
1308
1309         rc = req_capsule_server_pack(pill);
1310         if (rc)
1311                 RETURN(rc);
1312
1313         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1314         if (reply == NULL)
1315                 RETURN(-ENOMEM);
1316
1317         /* call again to fill in the reply buffer */
1318         rc = obd_get_info(exp, keylen, key, &replylen, reply);
1319
1320         lustre_msg_set_status(req->rq_repmsg, 0);
1321         RETURN(rc);
1322 }
1323
1324 static int ost_handle_quotactl(struct ptlrpc_request *req)
1325 {
1326         struct obd_quotactl *oqctl, *repoqc;
1327         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1328         ENTRY;
1329
1330         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1331                                    lustre_swab_obd_quotactl);
1332         if (oqctl == NULL)
1333                 GOTO(out, rc = -EPROTO);
1334
1335         rc = lustre_pack_reply(req, 2, size, NULL);
1336         if (rc)
1337                 GOTO(out, rc);
1338
1339         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1340
1341         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1342         *repoqc = *oqctl;
1343 out:
1344         RETURN(rc);
1345 }
1346
1347 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1348 {
1349         struct obd_quotactl *oqctl;
1350         int rc;
1351         ENTRY;
1352
1353         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1354         if (oqctl == NULL)
1355                 RETURN(-EPROTO);
1356
1357         rc = req_capsule_server_pack(&req->rq_pill);
1358         if (rc) {
1359                 CERROR("ost: out of memory while packing quotacheck reply\n");
1360                 RETURN(-ENOMEM);
1361         }
1362
1363         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1364         RETURN(0);
1365 }
1366
1367 static int ost_llog_handle_connect(struct obd_export *exp,
1368                                    struct ptlrpc_request *req)
1369 {
1370         struct llogd_conn_body *body;
1371         int rc;
1372         ENTRY;
1373
1374         body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
1375         rc = obd_llog_connect(exp, body);
1376         RETURN(rc);
1377 }
1378
1379 static int filter_export_check_flavor(struct filter_obd *filter,
1380                                       struct obd_export *exp,
1381                                       struct ptlrpc_request *req)
1382 {
1383         int     rc = 0;
1384
1385         /* FIXME
1386          * this should be done in filter_connect()/filter_reconnect(), but
1387          * we can't obtain information like NID, which stored in incoming
1388          * request, thus can't decide what flavor to use. so we do it here.
1389          *
1390          * This hack should be removed after the OST stack be rewritten, just
1391          * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1392          */
1393         if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_INVALID)
1394                 return 0;
1395
1396         CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
1397         spin_lock(&exp->exp_lock);
1398         exp->exp_sp_peer = req->rq_sp_from;
1399
1400         read_lock(&filter->fo_sptlrpc_lock);
1401         sptlrpc_rule_set_choose(&filter->fo_sptlrpc_rset, exp->exp_sp_peer,
1402                                 req->rq_peer.nid, &exp->exp_flvr);
1403         read_unlock(&filter->fo_sptlrpc_lock);
1404
1405         if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1406                 CERROR("invalid rpc flavor %x, expect %x, from %s\n",
1407                        req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
1408                        libcfs_nid2str(req->rq_peer.nid));
1409                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1410                 rc = -EACCES;
1411         }
1412
1413         spin_unlock(&exp->exp_lock);
1414
1415         return rc;
1416 }
1417
1418 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1419                                        struct obd_device *obd, int *process)
1420 {
1421         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1422         case OST_CONNECT: /* This will never get here, but for completeness. */
1423         case OST_DISCONNECT:
1424                *process = 1;
1425                RETURN(0);
1426
1427         case OBD_PING:
1428         case OST_CREATE:
1429         case OST_DESTROY:
1430         case OST_PUNCH:
1431         case OST_SETATTR:
1432         case OST_SYNC:
1433         case OST_WRITE:
1434         case OBD_LOG_CANCEL:
1435         case LDLM_ENQUEUE:
1436                 *process = target_queue_recovery_request(req, obd);
1437                 RETURN(0);
1438
1439         default:
1440                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1441                 *process = -EAGAIN;
1442                 RETURN(0);
1443         }
1444 }
1445
1446 int ost_msg_check_version(struct lustre_msg *msg)
1447 {
1448         int rc;
1449
1450         switch(lustre_msg_get_opc(msg)) {
1451         case OST_CONNECT:
1452         case OST_DISCONNECT:
1453         case OBD_PING:
1454         case SEC_CTX_INIT:
1455         case SEC_CTX_INIT_CONT:
1456         case SEC_CTX_FINI:
1457                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1458                 if (rc)
1459                         CERROR("bad opc %u version %08x, expecting %08x\n",
1460                                lustre_msg_get_opc(msg),
1461                                lustre_msg_get_version(msg),
1462                                LUSTRE_OBD_VERSION);
1463                 break;
1464         case OST_CREATE:
1465         case OST_DESTROY:
1466         case OST_GETATTR:
1467         case OST_SETATTR:
1468         case OST_WRITE:
1469         case OST_READ:
1470         case OST_PUNCH:
1471         case OST_STATFS:
1472         case OST_SYNC:
1473         case OST_SET_INFO:
1474         case OST_GET_INFO:
1475         case OST_QUOTACHECK:
1476         case OST_QUOTACTL:
1477                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1478                 if (rc)
1479                         CERROR("bad opc %u version %08x, expecting %08x\n",
1480                                lustre_msg_get_opc(msg),
1481                                lustre_msg_get_version(msg),
1482                                LUSTRE_OST_VERSION);
1483                 break;
1484         case LDLM_ENQUEUE:
1485         case LDLM_CONVERT:
1486         case LDLM_CANCEL:
1487         case LDLM_BL_CALLBACK:
1488         case LDLM_CP_CALLBACK:
1489                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1490                 if (rc)
1491                         CERROR("bad opc %u version %08x, expecting %08x\n",
1492                                lustre_msg_get_opc(msg),
1493                                lustre_msg_get_version(msg),
1494                                LUSTRE_DLM_VERSION);
1495                 break;
1496         case LLOG_ORIGIN_CONNECT:
1497         case OBD_LOG_CANCEL:
1498                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1499                 if (rc)
1500                         CERROR("bad opc %u version %08x, expecting %08x\n",
1501                                lustre_msg_get_opc(msg),
1502                                lustre_msg_get_version(msg),
1503                                LUSTRE_LOG_VERSION);
1504                 break;
1505         default:
1506                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1507                 rc = -ENOTSUPP;
1508         }
1509         return rc;
1510 }
1511
1512 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
1513 int ost_handle(struct ptlrpc_request *req)
1514 {
1515         struct obd_trans_info trans_info = { 0, };
1516         struct obd_trans_info *oti = &trans_info;
1517         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1518         struct obd_device *obd = NULL;
1519         ENTRY;
1520
1521         LASSERT(current->journal_info == NULL);
1522
1523         /* primordial rpcs don't affect server recovery */
1524         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1525         case SEC_CTX_INIT:
1526         case SEC_CTX_INIT_CONT:
1527         case SEC_CTX_FINI:
1528                 GOTO(out, rc = 0);
1529         }
1530
1531         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1532
1533         /* XXX identical to MDS */
1534         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1535                 int recovering;
1536
1537                 if (req->rq_export == NULL) {
1538                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1539                                lustre_msg_get_opc(req->rq_reqmsg),
1540                                libcfs_id2str(req->rq_peer));
1541                         req->rq_status = -ENOTCONN;
1542                         GOTO(out, rc = -ENOTCONN);
1543                 }
1544
1545                 obd = req->rq_export->exp_obd;
1546
1547                 /* Check for aborted recovery. */
1548                 spin_lock_bh(&obd->obd_processing_task_lock);
1549                 recovering = obd->obd_recovering;
1550                 spin_unlock_bh(&obd->obd_processing_task_lock);
1551                 if (recovering) {
1552                         rc = ost_filter_recovery_request(req, obd,
1553                                                          &should_process);
1554                         if (rc || !should_process)
1555                                 RETURN(rc);
1556                         else if (should_process < 0) {
1557                                 req->rq_status = should_process;
1558                                 rc = ptlrpc_error(req);
1559                                 RETURN(rc);
1560                         }
1561                 }
1562         }
1563
1564         oti_init(oti, req);
1565         
1566         rc = ost_msg_check_version(req->rq_reqmsg);
1567         if (rc)
1568                 RETURN(rc);
1569
1570         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1571         case OST_CONNECT: {
1572                 CDEBUG(D_INODE, "connect\n");
1573                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
1574                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
1575                         RETURN(0);
1576                 rc = target_handle_connect(req);
1577                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
1578                         RETURN(0);
1579                 if (!rc) {
1580                         struct obd_export *exp = req->rq_export;
1581
1582                         obd = exp->exp_obd;
1583
1584                         rc = filter_export_check_flavor(&obd->u.filter,
1585                                                         exp, req);
1586                 }
1587                 break;
1588         }
1589         case OST_DISCONNECT:
1590                 CDEBUG(D_INODE, "disconnect\n");
1591                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
1592                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
1593                         RETURN(0);
1594                 rc = target_handle_disconnect(req);
1595                 break;
1596         case OST_CREATE:
1597                 CDEBUG(D_INODE, "create\n");
1598                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
1599                         RETURN(0);
1600                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
1601                         GOTO(out, rc = -ENOSPC);
1602                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
1603                         GOTO(out, rc = -EROFS);
1604                 rc = ost_create(req->rq_export, req, oti);
1605                 break;
1606         case OST_DESTROY:
1607                 CDEBUG(D_INODE, "destroy\n");
1608                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
1609                         RETURN(0);
1610                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
1611                         GOTO(out, rc = -EROFS);
1612                 rc = ost_destroy(req->rq_export, req, oti);
1613                 break;
1614         case OST_GETATTR:
1615                 CDEBUG(D_INODE, "getattr\n");
1616                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
1617                         RETURN(0);
1618                 rc = ost_getattr(req->rq_export, req);
1619                 break;
1620         case OST_SETATTR:
1621                 CDEBUG(D_INODE, "setattr\n");
1622                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
1623                         RETURN(0);
1624                 rc = ost_setattr(req->rq_export, req, oti);
1625                 break;
1626         case OST_WRITE:
1627                 CDEBUG(D_INODE, "write\n");
1628                 /* req->rq_request_portal would be nice, if it was set */
1629                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1630                         CERROR("%s: deny write request from %s to portal %u\n",
1631                                req->rq_export->exp_obd->obd_name,
1632                                obd_export_nid2str(req->rq_export),
1633                                req->rq_rqbd->rqbd_service->srv_req_portal);
1634                         GOTO(out, rc = -EPROTO);
1635                 }
1636                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
1637                         RETURN(0);
1638                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
1639                         GOTO(out, rc = -ENOSPC);
1640                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
1641                         GOTO(out, rc = -EROFS);
1642                 rc = ost_brw_write(req, oti);
1643                 LASSERT(current->journal_info == NULL);
1644                 /* ost_brw_write sends its own replies */
1645                 RETURN(rc);
1646         case OST_READ:
1647                 CDEBUG(D_INODE, "read\n");
1648                 /* req->rq_request_portal would be nice, if it was set */
1649                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
1650                         CERROR("%s: deny read request from %s to portal %u\n",
1651                                req->rq_export->exp_obd->obd_name,
1652                                obd_export_nid2str(req->rq_export),
1653                                req->rq_rqbd->rqbd_service->srv_req_portal);
1654                         GOTO(out, rc = -EPROTO);
1655                 }
1656                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
1657                         RETURN(0);
1658                 rc = ost_brw_read(req, oti);
1659                 LASSERT(current->journal_info == NULL);
1660                 /* ost_brw_read sends its own replies */
1661                 RETURN(rc);
1662         case OST_PUNCH:
1663                 CDEBUG(D_INODE, "punch\n");
1664                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
1665                         RETURN(0);
1666                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
1667                         GOTO(out, rc = -EROFS);
1668                 rc = ost_punch(req->rq_export, req, oti);
1669                 break;
1670         case OST_STATFS:
1671                 CDEBUG(D_INODE, "statfs\n");
1672                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
1673                         RETURN(0);
1674                 rc = ost_statfs(req);
1675                 break;
1676         case OST_SYNC:
1677                 CDEBUG(D_INODE, "sync\n");
1678                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
1679                         RETURN(0);
1680                 rc = ost_sync(req->rq_export, req);
1681                 break;
1682         case OST_SET_INFO:
1683                 DEBUG_REQ(D_INODE, req, "set_info");
1684                 req_capsule_set(&req->rq_pill, &RQF_OST_SET_INFO);
1685                 rc = ost_set_info(req->rq_export, req);
1686                 break;
1687         case OST_GET_INFO:
1688                 DEBUG_REQ(D_INODE, req, "get_info");
1689                 rc = ost_get_info(req->rq_export, req);
1690                 break;
1691         case OST_QUOTACHECK:
1692                 CDEBUG(D_INODE, "quotacheck\n");
1693                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
1694                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
1695                         RETURN(0);
1696                 rc = ost_handle_quotacheck(req);
1697                 break;
1698         case OST_QUOTACTL:
1699                 CDEBUG(D_INODE, "quotactl\n");
1700                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
1701                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
1702                         RETURN(0);
1703                 rc = ost_handle_quotactl(req);
1704                 break;
1705         case OBD_PING:
1706                 DEBUG_REQ(D_INODE, req, "ping");
1707                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
1708                 rc = target_handle_ping(req);
1709                 break;
1710         /* FIXME - just reply status */
1711         case LLOG_ORIGIN_CONNECT:
1712                 DEBUG_REQ(D_INODE, req, "log connect");
1713                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
1714                 rc = ost_llog_handle_connect(req->rq_export, req);
1715                 req->rq_status = rc;
1716                 rc = req_capsule_server_pack(&req->rq_pill);
1717                 if (rc)
1718                         RETURN(rc);
1719                 RETURN(ptlrpc_reply(req));
1720         case OBD_LOG_CANCEL:
1721                 CDEBUG(D_INODE, "log cancel\n");
1722                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
1723                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
1724                         RETURN(0);
1725                 rc = llog_origin_handle_cancel(req);
1726                 req->rq_status = rc;
1727                 rc = req_capsule_server_pack(&req->rq_pill);
1728                 if (rc)
1729                         RETURN(rc);
1730                 RETURN(ptlrpc_reply(req));
1731         case LDLM_ENQUEUE:
1732                 CDEBUG(D_INODE, "enqueue\n");
1733                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
1734                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
1735                         RETURN(0);
1736                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1737                                          ldlm_server_blocking_ast,
1738                                          ldlm_server_glimpse_ast);
1739                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1740                 break;
1741         case LDLM_CONVERT:
1742                 CDEBUG(D_INODE, "convert\n");
1743                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
1744                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
1745                         RETURN(0);
1746                 rc = ldlm_handle_convert(req);
1747                 break;
1748         case LDLM_CANCEL:
1749                 CDEBUG(D_INODE, "cancel\n");
1750                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
1751                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
1752                         RETURN(0);
1753                 rc = ldlm_handle_cancel(req);
1754                 break;
1755         case LDLM_BL_CALLBACK:
1756         case LDLM_CP_CALLBACK:
1757                 CDEBUG(D_INODE, "callback\n");
1758                 CERROR("callbacks should not happen on OST\n");
1759                 /* fall through */
1760         default:
1761                 CERROR("Unexpected opcode %d\n",
1762                        lustre_msg_get_opc(req->rq_reqmsg));
1763                 req->rq_status = -ENOTSUPP;
1764                 rc = ptlrpc_error(req);
1765                 RETURN(rc);
1766         }
1767
1768         LASSERT(current->journal_info == NULL);
1769
1770         EXIT;
1771         /* If we're DISCONNECTing, the export_data is already freed */
1772         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
1773                 target_committed_to_req(req);
1774
1775 out:
1776         if (!rc)
1777                 oti_to_request(oti, req);
1778
1779         target_send_reply(req, rc, fail);
1780         return 0;
1781 }
1782 EXPORT_SYMBOL(ost_handle);
1783 /*
1784  * free per-thread pool created by ost_thread_init().
1785  */
1786 static void ost_thread_done(struct ptlrpc_thread *thread)
1787 {
1788         int i;
1789         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
1790                                              * Storage */
1791
1792         ENTRY;
1793
1794         LASSERT(thread != NULL);
1795
1796         /*
1797          * be prepared to handle partially-initialized pools (because this is
1798          * called from ost_thread_init() for cleanup.
1799          */
1800         tls = thread->t_data;
1801         if (tls != NULL) {
1802                 for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
1803                         if (tls->page[i] != NULL)
1804                                 OBD_PAGE_FREE(tls->page[i]);
1805                 }
1806                 OBD_FREE_PTR(tls);
1807                 thread->t_data = NULL;
1808         }
1809         EXIT;
1810 }
1811
1812 /*
1813  * initialize per-thread page pool (bug 5137).
1814  */
1815 static int ost_thread_init(struct ptlrpc_thread *thread)
1816 {
1817         int result;
1818         int i;
1819         struct ost_thread_local_cache *tls;
1820
1821         ENTRY;
1822
1823         LASSERT(thread != NULL);
1824         LASSERT(thread->t_data == NULL);
1825         LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
1826
1827         OBD_ALLOC_PTR(tls);
1828         if (tls != NULL) {
1829                 result = 0;
1830                 thread->t_data = tls;
1831                 /*
1832                  * populate pool
1833                  */
1834                 for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
1835                         OBD_PAGE_ALLOC(tls->page[i], OST_THREAD_POOL_GFP);
1836                         if (tls->page[i] == NULL) {
1837                                 ost_thread_done(thread);
1838                                 result = -ENOMEM;
1839                                 break;
1840                         }
1841                 }
1842         } else
1843                 result = -ENOMEM;
1844         RETURN(result);
1845 }
1846
1847 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
1848
1849 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
1850 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1851 {
1852         struct ost_obd *ost = &obd->u.ost;
1853         struct lprocfs_static_vars lvars;
1854         int oss_min_threads;
1855         int oss_max_threads;
1856         int oss_min_create_threads;
1857         int oss_max_create_threads;
1858         int rc;
1859         ENTRY;
1860
1861         rc = cleanup_group_info();
1862         if (rc)
1863                 RETURN(rc);
1864
1865         lprocfs_ost_init_vars(&lvars);
1866         lprocfs_obd_setup(obd, lvars.obd_vars);
1867
1868         sema_init(&ost->ost_health_sem, 1);
1869
1870         if (oss_num_threads) {
1871                 /* If oss_num_threads is set, it is the min and the max. */
1872                 if (oss_num_threads > OSS_THREADS_MAX) 
1873                         oss_num_threads = OSS_THREADS_MAX;
1874                 if (oss_num_threads < OSS_THREADS_MIN)
1875                         oss_num_threads = OSS_THREADS_MIN;
1876                 oss_max_threads = oss_min_threads = oss_num_threads;
1877         } else {
1878                 /* Base min threads on memory and cpus */
1879                 oss_min_threads = num_possible_cpus() * num_physpages >> 
1880                         (27 - CFS_PAGE_SHIFT);
1881                 if (oss_min_threads < OSS_THREADS_MIN)
1882                         oss_min_threads = OSS_THREADS_MIN;
1883                 /* Insure a 4x range for dynamic threads */
1884                 if (oss_min_threads > OSS_THREADS_MAX / 4) 
1885                         oss_min_threads = OSS_THREADS_MAX / 4;
1886                 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4);
1887         }
1888
1889         ost->ost_service =
1890                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1891                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
1892                                 OSC_REPLY_PORTAL,
1893                                 OST_WATCHDOG_TIMEOUT, ost_handle,
1894                                 LUSTRE_OSS_NAME, obd->obd_proc_entry,
1895                                 ost_print_req, oss_min_threads,
1896                                 oss_max_threads, "ll_ost",
1897                                 LCT_DT_THREAD);
1898         if (ost->ost_service == NULL) {
1899                 CERROR("failed to start service\n");
1900                 GOTO(out_lprocfs, rc = -ENOMEM);
1901         }
1902
1903         rc = ptlrpc_start_threads(obd, ost->ost_service);
1904         if (rc)
1905                 GOTO(out_service, rc = -EINVAL);
1906
1907         if (oss_num_create_threads) {
1908                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
1909                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
1910                 if (oss_num_create_threads < OSS_DEF_CREATE_THREADS)
1911                         oss_num_create_threads = OSS_DEF_CREATE_THREADS;
1912                 oss_min_create_threads = oss_max_create_threads =
1913                         oss_num_create_threads;
1914         } else {
1915                 oss_min_create_threads = OSS_DEF_CREATE_THREADS;
1916                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
1917         }
1918
1919         ost->ost_create_service =
1920                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1921                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
1922                                 OSC_REPLY_PORTAL,
1923                                 OST_WATCHDOG_TIMEOUT, ost_handle, "ost_create",
1924                                 obd->obd_proc_entry, ost_print_req,
1925                                 oss_min_create_threads,
1926                                 oss_max_create_threads,
1927                                 "ll_ost_creat", LCT_DT_THREAD);
1928         if (ost->ost_create_service == NULL) {
1929                 CERROR("failed to start OST create service\n");
1930                 GOTO(out_service, rc = -ENOMEM);
1931         }
1932
1933         rc = ptlrpc_start_threads(obd, ost->ost_create_service);
1934         if (rc)
1935                 GOTO(out_create, rc = -EINVAL);
1936
1937         ost->ost_io_service =
1938                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1939                                 OST_MAXREPSIZE, OST_IO_PORTAL,
1940                                 OSC_REPLY_PORTAL,
1941                                 OST_WATCHDOG_TIMEOUT, ost_handle, "ost_io",
1942                                 obd->obd_proc_entry, ost_print_req,
1943                                 oss_min_threads, oss_max_threads,
1944                                 "ll_ost_io", LCT_DT_THREAD);
1945         if (ost->ost_io_service == NULL) {
1946                 CERROR("failed to start OST I/O service\n");
1947                 GOTO(out_create, rc = -ENOMEM);
1948         }
1949
1950         ost->ost_io_service->srv_init = ost_thread_init;
1951         ost->ost_io_service->srv_done = ost_thread_done;
1952         ost->ost_io_service->srv_cpu_affinity = 1;
1953         rc = ptlrpc_start_threads(obd, ost->ost_io_service);
1954         if (rc)
1955                 GOTO(out_io, rc = -EINVAL);
1956
1957         ping_evictor_start();
1958
1959         RETURN(0);
1960
1961 out_io:
1962         ptlrpc_unregister_service(ost->ost_io_service);
1963         ost->ost_io_service = NULL;
1964 out_create:
1965         ptlrpc_unregister_service(ost->ost_create_service);
1966         ost->ost_create_service = NULL;
1967 out_service:
1968         ptlrpc_unregister_service(ost->ost_service);
1969         ost->ost_service = NULL;
1970 out_lprocfs:
1971         lprocfs_obd_cleanup(obd);
1972         RETURN(rc);
1973 }
1974
1975 static int ost_cleanup(struct obd_device *obd)
1976 {
1977         struct ost_obd *ost = &obd->u.ost;
1978         int err = 0;
1979         ENTRY;
1980
1981         ping_evictor_stop();
1982
1983         spin_lock_bh(&obd->obd_processing_task_lock);
1984         if (obd->obd_recovering) {
1985                 target_cancel_recovery_timer(obd);
1986                 obd->obd_recovering = 0;
1987         }
1988         spin_unlock_bh(&obd->obd_processing_task_lock);
1989
1990         down(&ost->ost_health_sem);
1991         ptlrpc_unregister_service(ost->ost_service);
1992         ptlrpc_unregister_service(ost->ost_create_service);
1993         ptlrpc_unregister_service(ost->ost_io_service);
1994         ost->ost_service = NULL;
1995         ost->ost_create_service = NULL;
1996         up(&ost->ost_health_sem);
1997
1998         lprocfs_obd_cleanup(obd);
1999
2000         RETURN(err);
2001 }
2002
2003 static int ost_health_check(struct obd_device *obd)
2004 {
2005         struct ost_obd *ost = &obd->u.ost;
2006         int rc = 0;
2007
2008         down(&ost->ost_health_sem);
2009         rc |= ptlrpc_service_health_check(ost->ost_service);
2010         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2011         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2012         up(&ost->ost_health_sem);
2013
2014         /*
2015          * health_check to return 0 on healthy
2016          * and 1 on unhealthy.
2017          */
2018         if( rc != 0)
2019                 rc = 1;
2020
2021         return rc;
2022 }
2023
2024 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2025 {
2026         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2027 }
2028
2029 /* use obd ops to offer management infrastructure */
2030 static struct obd_ops ost_obd_ops = {
2031         .o_owner        = THIS_MODULE,
2032         .o_setup        = ost_setup,
2033         .o_cleanup      = ost_cleanup,
2034         .o_health_check = ost_health_check,
2035 };
2036
2037
2038 static int __init ost_init(void)
2039 {
2040         struct lprocfs_static_vars lvars;
2041         int rc;
2042         ENTRY;
2043
2044         lprocfs_ost_init_vars(&lvars);
2045         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2046                                  LUSTRE_OSS_NAME, NULL);
2047
2048         if (ost_num_threads != 0 && oss_num_threads == 0) {
2049                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2050                               "use oss_num_threads instead or unset both for "
2051                               "dynamic thread startup\n");
2052                 oss_num_threads = ost_num_threads;
2053         }
2054
2055         RETURN(rc);
2056 }
2057
2058 static void /*__exit*/ ost_exit(void)
2059 {
2060         class_unregister_type(LUSTRE_OSS_NAME);
2061 }
2062
2063 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2064 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2065 MODULE_LICENSE("GPL");
2066
2067 module_init(ost_init);
2068 module_exit(ost_exit);