Whamcloud - gitweb
- fixes in split about using correct byte order;
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   You may have signed or agreed to another license before downloading
12  *   this software.  If so, you are bound by the terms and conditions
13  *   of that agreement, and the following does not apply to you.  See the
14  *   LICENSE file included with this distribution for more information.
15  *
16  *   If you did not agree to a different license, then this copy of Lustre
17  *   is open source software; you can redistribute it and/or modify it
18  *   under the terms of version 2 of the GNU General Public License as
19  *   published by the Free Software Foundation.
20  *
21  *   In either case, Lustre is distributed in the hope that it will be
22  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  *   license text for more details.
25  *
26  *  Storage Target Handling functions
27  *  Lustre Object Server Module (OST)
28  *
29  *  This server is single threaded at present (but can easily be multi
30  *  threaded). For testing and management it is treated as an
31  *  obd_device, although it does not export a full OBD method table
32  *  (the requests are coming in over the wire, so object target
33  *  modules do not have a full method table.)
34  */
35
36 #ifndef EXPORT_SYMTAB
37 # define EXPORT_SYMTAB
38 #endif
39 #define DEBUG_SUBSYSTEM S_OST
40
41 #include <linux/module.h>
42 #include <obd_ost.h>
43 #include <lustre_net.h>
44 #include <lustre_dlm.h>
45 #include <lustre_export.h>
46 #include <lustre_debug.h>
47 #include <linux/init.h>
48 #include <lprocfs_status.h>
49 #include <lustre_commit_confd.h>
50 #include <libcfs/list.h>
51 #include <lustre_quota.h>
52 #include "ost_internal.h"
53
54 static int ost_num_threads;
55 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
56                 "number of OST service threads to start");
57
58 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
59 {
60         struct oti_req_ack_lock *ack_lock;
61         int i;
62
63         if (oti == NULL)
64                 return;
65
66         if (req->rq_repmsg)
67                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
68         req->rq_transno = oti->oti_transno;
69
70         /* XXX 4 == entries in oti_ack_locks??? */
71         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
72                 if (!ack_lock->mode)
73                         break;
74                 /* XXX not even calling target_send_reply in some cases... */
75                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
76         }
77 }
78
79 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
80                        struct obd_trans_info *oti)
81 {
82         struct ost_body *body, *repbody;
83         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
84         ENTRY;
85
86         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
87                                   lustre_swab_ost_body);
88         if (body == NULL)
89                 RETURN(-EFAULT);
90
91         rc = lustre_pack_reply(req, 2, size, NULL);
92         if (rc)
93                 RETURN(rc);
94
95         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
96                 oti->oti_logcookies = obdo_logcookie(&body->oa);
97         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
98                                  sizeof(*repbody));
99         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
100         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti, NULL);
101         RETURN(0);
102 }
103
104 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
105 {
106         struct ost_body *body, *repbody;
107         struct obd_info oinfo = { { { 0 } } };
108         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
109         ENTRY;
110
111         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
112                                   lustre_swab_ost_body);
113         if (body == NULL)
114                 RETURN(-EFAULT);
115
116         rc = lustre_pack_reply(req, 2, size, NULL);
117         if (rc)
118                 RETURN(rc);
119
120         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
121                                  sizeof(*repbody));
122         repbody->oa = body->oa;
123
124         oinfo.oi_oa = &repbody->oa;
125         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
126                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
127                                                    REQ_REC_OFF + 1);
128         req->rq_status = obd_getattr(exp, &oinfo);
129         RETURN(0);
130 }
131
132 static int ost_statfs(struct ptlrpc_request *req)
133 {
134         struct obd_statfs *osfs;
135         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
136         ENTRY;
137
138         rc = lustre_pack_reply(req, 2, size, NULL);
139         if (rc)
140                 RETURN(rc);
141
142         osfs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*osfs));
143
144         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs,
145                                     cfs_time_current_64() - HZ);
146         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
147                 osfs->os_bfree = osfs->os_bavail = 64;
148         if (req->rq_status != 0)
149                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
150
151         RETURN(0);
152 }
153
154 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
155                       struct obd_trans_info *oti)
156 {
157         struct ost_body *body, *repbody;
158         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
159         ENTRY;
160
161         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
162                                   lustre_swab_ost_body);
163         if (body == NULL)
164                 RETURN(-EFAULT);
165
166         rc = lustre_pack_reply(req, 2, size, NULL);
167         if (rc)
168                 RETURN(rc);
169
170         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
171                                  sizeof(*repbody));
172         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
173         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
174         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
175         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
176         RETURN(0);
177 }
178
179 /*
180  * Helper function for ost_punch(): if asked by client, acquire [size, EOF]
181  * lock on the file being truncated.
182  */
183 static int ost_punch_lock_get(struct obd_export *exp, struct obdo *oa,
184                               struct lustre_handle *lh)
185 {
186         int flags;
187         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0} };
188         ldlm_policy_data_t policy;
189         __u64 start;
190         __u64 finis;
191
192         ENTRY;
193
194         LASSERT(!lustre_handle_is_used(lh));
195
196         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
197             !(oa->o_flags & OBD_FL_TRUNCLOCK))
198                 RETURN(0);
199
200         CDEBUG(D_INODE, "OST-side truncate lock.\n");
201
202         start = oa->o_size;
203         finis = start + oa->o_blocks;
204
205         /*
206          * standard truncate optimization: if file body is completely
207          * destroyed, don't send data back to the server.
208          */
209         flags = (start == 0) ? LDLM_AST_DISCARD_DATA : 0;
210
211         policy.l_extent.start = start & CFS_PAGE_MASK;
212
213         /*
214          * If ->o_blocks is EOF it means "lock till the end of the
215          * file". Otherwise, it's size of a hole being punched (in bytes)
216          */
217         if (oa->o_blocks == OBD_OBJECT_EOF || finis < start)
218                 policy.l_extent.end = OBD_OBJECT_EOF;
219         else
220                 policy.l_extent.end = finis | ~CFS_PAGE_MASK;
221
222         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
223                                       LDLM_EXTENT, &policy, LCK_PW, &flags,
224                                       ldlm_blocking_ast, ldlm_completion_ast,
225                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
226 }
227
228 /*
229  * Helper function for ost_punch(): release lock acquired by
230  * ost_punch_lock_get(), if any.
231  */
232 static void ost_punch_lock_put(struct obd_export *exp, struct obdo *oa,
233                                struct lustre_handle *lh)
234 {
235         ENTRY;
236         if (lustre_handle_is_used(lh))
237                 ldlm_lock_decref(lh, LCK_PW);
238         EXIT;
239 }
240
241 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
242                      struct obd_trans_info *oti)
243 {
244         struct obd_info oinfo = { { { 0 } } };
245         struct ost_body *body, *repbody;
246         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
247         struct lustre_handle lh = {0,};
248         ENTRY;
249
250         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
251         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
252
253         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
254                                   lustre_swab_ost_body);
255         if (body == NULL)
256                 RETURN(-EFAULT);
257
258         oinfo.oi_oa = &body->oa;
259         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
260         oinfo.oi_policy.l_extent.end = oinfo.oi_oa->o_blocks;
261
262         if ((oinfo.oi_oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
263             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
264                 RETURN(-EINVAL);
265
266         rc = lustre_pack_reply(req, 2, size, NULL);
267         if (rc)
268                 RETURN(rc);
269
270         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
271                                  sizeof(*repbody));
272         repbody->oa = *oinfo.oi_oa;
273         rc = ost_punch_lock_get(exp, oinfo.oi_oa, &lh);
274         if (rc == 0) {
275                 if (oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
276                     oinfo.oi_oa->o_flags == OBD_FL_TRUNCLOCK)
277                         /*
278                          * If OBD_FL_TRUNCLOCK is the only bit set in
279                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
280                          * through filter_setattr() to filter_iocontrol().
281                          */
282                         oinfo.oi_oa->o_valid &= ~OBD_MD_FLFLAGS;
283
284                 if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
285                         oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
286                                                            REQ_REC_OFF + 1);
287                 req->rq_status = obd_punch(exp, &oinfo, oti, NULL);
288                 ost_punch_lock_put(exp, oinfo.oi_oa, &lh);
289         }
290         RETURN(rc);
291 }
292
293 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
294 {
295         struct ost_body *body, *repbody;
296         struct lustre_capa *capa = NULL;
297         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
298         ENTRY;
299
300         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
301                                   lustre_swab_ost_body);
302         if (body == NULL)
303                 RETURN(-EFAULT);
304
305         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
306                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 1);
307
308         rc = lustre_pack_reply(req, 2, size, NULL);
309         if (rc)
310                 RETURN(rc);
311
312         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
313                                  sizeof(*repbody));
314         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
315         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
316                                   repbody->oa.o_blocks, capa);
317         RETURN(0);
318 }
319
320 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
321                        struct obd_trans_info *oti)
322 {
323         struct ost_body *body, *repbody;
324         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
325         struct obd_info oinfo = { { { 0 } } };
326         ENTRY;
327
328         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
329                                   lustre_swab_ost_body);
330         if (body == NULL)
331                 RETURN(-EFAULT);
332
333         rc = lustre_pack_reply(req, 2, size, NULL);
334         if (rc)
335                 RETURN(rc);
336
337         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
338                                  sizeof(*repbody));
339         repbody->oa = body->oa;
340
341         oinfo.oi_oa = &repbody->oa;
342         if (oinfo.oi_oa->o_valid & OBD_MD_FLOSSCAPA)
343                 oinfo.oi_capa = lustre_unpack_capa(req->rq_reqmsg,
344                                                    REQ_REC_OFF + 1);
345         req->rq_status = obd_setattr(exp, &oinfo, oti);
346         RETURN(0);
347 }
348
349 static int ost_bulk_timeout(void *data)
350 {
351         ENTRY;
352         /* We don't fail the connection here, because having the export
353          * killed makes the (vital) call to commitrw very sad.
354          */
355         RETURN(1);
356 }
357
358 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
359                                 struct niobuf_remote *rnb, int nrnb,
360                                 struct niobuf_remote **pp_rnbp)
361 {
362         /* Copy a remote niobuf, splitting it into page-sized chunks
363          * and setting ioo[i].ioo_bufcnt accordingly */
364         struct niobuf_remote *pp_rnb;
365         int   i;
366         int   j;
367         int   page;
368         int   rnbidx = 0;
369         int   npages = 0;
370
371         /*
372          * array of sufficient size already preallocated by caller
373          */
374         LASSERT(pp_rnbp != NULL);
375         LASSERT(*pp_rnbp != NULL);
376
377         /* first count and check the number of pages required */
378         for (i = 0; i < nioo; i++)
379                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
380                         obd_off offset = rnb[rnbidx].offset;
381                         obd_off p0 = offset >> PAGE_SHIFT;
382                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
383
384                         LASSERT(rnbidx < nrnb);
385
386                         npages += (pn + 1 - p0);
387
388                         if (rnb[rnbidx].len == 0) {
389                                 CERROR("zero len BRW: obj %d objid "LPX64
390                                        " buf %u\n", i, ioo[i].ioo_id, j);
391                                 return -EINVAL;
392                         }
393                         if (j > 0 &&
394                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
395                                 CERROR("unordered BRW: obj %d objid "LPX64
396                                        " buf %u offset "LPX64" <= "LPX64"\n",
397                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
398                                        rnb[rnbidx].offset);
399                                 return -EINVAL;
400                         }
401                 }
402
403         LASSERT(rnbidx == nrnb);
404
405         if (npages == nrnb) {       /* all niobufs are for single pages */
406                 *pp_rnbp = rnb;
407                 return npages;
408         }
409
410         pp_rnb = *pp_rnbp;
411
412         /* now do the actual split */
413         page = rnbidx = 0;
414         for (i = 0; i < nioo; i++) {
415                 int  obj_pages = 0;
416
417                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
418                         obd_off off = rnb[rnbidx].offset;
419                         int     nob = rnb[rnbidx].len;
420
421                         LASSERT(rnbidx < nrnb);
422                         do {
423                                 obd_off  poff = off & (PAGE_SIZE - 1);
424                                 int      pnob = (poff + nob > PAGE_SIZE) ?
425                                                 PAGE_SIZE - poff : nob;
426
427                                 LASSERT(page < npages);
428                                 pp_rnb[page].len = pnob;
429                                 pp_rnb[page].offset = off;
430                                 pp_rnb[page].flags = rnb[rnbidx].flags;
431
432                                 CDEBUG(0, "   obj %d id "LPX64
433                                        "page %d(%d) "LPX64" for %d, flg %x\n",
434                                        i, ioo[i].ioo_id, obj_pages, page,
435                                        pp_rnb[page].offset, pp_rnb[page].len,
436                                        pp_rnb[page].flags);
437                                 page++;
438                                 obj_pages++;
439
440                                 off += pnob;
441                                 nob -= pnob;
442                         } while (nob > 0);
443                         LASSERT(nob == 0);
444                 }
445                 ioo[i].ioo_bufcnt = obj_pages;
446         }
447         LASSERT(page == npages);
448
449         return npages;
450 }
451
452 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
453 {
454         __u32 cksum = ~0;
455         int i;
456
457         for (i = 0; i < desc->bd_iov_count; i++) {
458                 struct page *page = desc->bd_iov[i].kiov_page;
459                 int off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK;
460                 char *ptr = kmap(page) + off;
461                 int len = desc->bd_iov[i].kiov_len;
462
463                 cksum = crc32_le(cksum, ptr, len);
464         }
465
466         return cksum;
467 }
468
469 /*
470  * populate @nio by @nrpages pages from per-thread page pool
471  */
472 static void ost_nio_pages_get(struct ptlrpc_request *req,
473                               struct niobuf_local *nio, int nrpages)
474 {
475         int i;
476         struct ost_thread_local_cache *tls;
477
478         ENTRY;
479
480         LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
481         LASSERT(req != NULL);
482         LASSERT(req->rq_svc_thread != NULL);
483
484         tls = ost_tls(req);
485         LASSERT(tls != NULL);
486
487         memset(nio, 0, nrpages * sizeof *nio);
488         for (i = 0; i < nrpages; ++ i) {
489                 struct page *page;
490
491                 page = tls->page[i];
492                 LASSERT(page != NULL);
493                 POISON_PAGE(page, 0xf1);
494                 nio[i].page = page;
495                 LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
496         }
497         EXIT;
498 }
499
500 /*
501  * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
502  */
503 static void ost_nio_pages_put(struct ptlrpc_request *req,
504                               struct niobuf_local *nio, int nrpages)
505 {
506         int i;
507
508         ENTRY;
509
510         LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
511
512         for (i = 0; i < nrpages; ++ i)
513                 POISON_PAGE(nio[i].page, 0xf2);
514         EXIT;
515 }
516
517 static int ost_brw_lock_get(int mode, struct obd_export *exp,
518                             struct obd_ioobj *obj, struct niobuf_remote *nb,
519                             struct lustre_handle *lh)
520 {
521         int flags                 = 0;
522         int nrbufs                = obj->ioo_bufcnt;
523         struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
524                                                 obj->ioo_gr, 0} };
525         ldlm_policy_data_t policy;
526         int i;
527
528         ENTRY;
529
530         LASSERT(mode == LCK_PR || mode == LCK_PW);
531         LASSERT(!lustre_handle_is_used(lh));
532
533         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
534                 RETURN(0);
535
536         /* EXPENSIVE ASSERTION */
537         for (i = 1; i < nrbufs; i ++)
538                 LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
539                         (nb[i].flags & OBD_BRW_SRVLOCK));
540
541         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
542         policy.l_extent.end   = (nb[nrbufs - 1].offset +
543                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
544
545         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
546                                       LDLM_EXTENT, &policy, mode, &flags,
547                                       ldlm_blocking_ast, ldlm_completion_ast,
548                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
549 }
550
551 static void ost_brw_lock_put(int mode,
552                              struct obd_ioobj *obj, struct niobuf_remote *niob,
553                              struct lustre_handle *lh)
554 {
555         ENTRY;
556         LASSERT(mode == LCK_PR || mode == LCK_PW);
557         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
558                 lustre_handle_is_used(lh));
559         if (lustre_handle_is_used(lh))
560                 ldlm_lock_decref(lh, mode);
561         EXIT;
562 }
563
564 struct ost_prolong_data {
565         struct obd_export *opd_exp;
566         ldlm_policy_data_t opd_policy;
567         ldlm_mode_t opd_mode;
568 };
569
570 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
571 {
572         struct ost_prolong_data *opd = data;
573
574         LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
575
576         if (lock->l_req_mode != lock->l_granted_mode) {
577                 /* scan granted locks only */
578                 return LDLM_ITER_STOP;
579         }
580
581         if (lock->l_export != opd->opd_exp) {
582                 /* prolong locks only for given client */
583                 return LDLM_ITER_CONTINUE;
584         }
585
586         if (!(lock->l_granted_mode & opd->opd_mode)) {
587                 /* we aren't interesting in all type of locks */
588                 return LDLM_ITER_CONTINUE;
589         }
590
591         if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
592             lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
593                 /* the request doesn't cross the lock, skip it */
594                 return LDLM_ITER_CONTINUE;
595         }
596
597         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
598                 /* ignore locks not being cancelled */
599                 return LDLM_ITER_CONTINUE;
600         }
601
602         /* OK. this is a possible lock the user holds doing I/O
603          * let's refresh eviction timer for it */
604         ldlm_refresh_waiting_lock(lock);
605
606         return LDLM_ITER_CONTINUE;
607 }
608
609 static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
610                               struct niobuf_remote *nb, ldlm_mode_t mode)
611 {
612         struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
613                                                 obj->ioo_gr, 0} };
614         int nrbufs = obj->ioo_bufcnt;
615         struct ost_prolong_data opd;
616
617         ENTRY;
618
619         opd.opd_mode = mode;
620         opd.opd_exp = exp;
621         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
622         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
623                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
624
625         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
626                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
627                opd.opd_policy.l_extent.end);
628         ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
629                               ost_prolong_locks_iter, &opd);
630 }
631
632 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
633 {
634         struct ptlrpc_bulk_desc *desc;
635         struct niobuf_remote *remote_nb;
636         struct niobuf_remote *pp_rnb = NULL;
637         struct niobuf_local *local_nb;
638         struct obd_ioobj *ioo;
639         struct ost_body *body, *repbody;
640         struct lustre_capa *capa = NULL;
641         struct l_wait_info lwi;
642         struct lustre_handle lockh = { 0 };
643         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
644         int comms_error = 0, niocount, npages, nob = 0, rc, i, do_checksum;
645         ENTRY;
646
647         req->rq_bulk_read = 1;
648
649         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
650                 GOTO(out, rc = -EIO);
651
652         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
653                          (obd_timeout + 1) / 4);
654
655         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
656                                   lustre_swab_ost_body);
657         if (body == NULL) {
658                 CERROR("Missing/short ost_body\n");
659                 GOTO(out, rc = -EFAULT);
660         }
661
662         ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
663                                  lustre_swab_obd_ioobj);
664         if (ioo == NULL) {
665                 CERROR("Missing/short ioobj\n");
666                 GOTO(out, rc = -EFAULT);
667         }
668
669         niocount = ioo->ioo_bufcnt;
670         if (niocount > PTLRPC_MAX_BRW_PAGES) {
671                 DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)\n",
672                           niocount);
673                 GOTO(out, rc = -EFAULT);
674         }
675
676         remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
677                                        niocount * sizeof(*remote_nb),
678                                        lustre_swab_niobuf_remote);
679         if (remote_nb == NULL) {
680                 CERROR("Missing/short niobuf\n");
681                 GOTO(out, rc = -EFAULT);
682         }
683         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
684                 for (i = 1; i < niocount; i++)
685                         lustre_swab_niobuf_remote (&remote_nb[i]);
686         }
687
688         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
689                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
690
691         rc = lustre_pack_reply(req, 2, size, NULL);
692         if (rc)
693                 GOTO(out, rc);
694
695         /*
696          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
697          * ost_thread_init().
698          */
699         local_nb = ost_tls(req)->local;
700         pp_rnb   = ost_tls(req)->remote;
701
702         /* FIXME all niobuf splitting should be done in obdfilter if needed */
703         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
704         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
705         if (npages < 0)
706                 GOTO(out, rc = npages);
707
708         LASSERT(npages <= OST_THREAD_POOL_SIZE);
709
710         ost_nio_pages_get(req, local_nb, npages);
711
712         desc = ptlrpc_prep_bulk_exp(req, npages,
713                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
714         if (desc == NULL)
715                 GOTO(out, rc = -ENOMEM);
716
717         rc = ost_brw_lock_get(LCK_PR, req->rq_export, ioo, pp_rnb, &lockh);
718         if (rc != 0)
719                 GOTO(out_bulk, rc);
720
721         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
722                         ioo, npages, pp_rnb, local_nb, oti, capa);
723         if (rc != 0)
724                 GOTO(out_lock, rc);
725
726         ost_prolong_locks(req->rq_export, ioo, pp_rnb, LCK_PW | LCK_PR);
727
728         /* We're finishing using body->oa as an input variable */
729         do_checksum = (body->oa.o_valid & OBD_MD_FLCKSUM);
730         body->oa.o_valid = 0;
731
732         nob = 0;
733         for (i = 0; i < npages; i++) {
734                 int page_rc = local_nb[i].rc;
735
736                 if (page_rc < 0) {              /* error */
737                         rc = page_rc;
738                         break;
739                 }
740
741                 LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > "
742                          "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len);
743                 nob += page_rc;
744                 if (page_rc != 0) {             /* some data! */
745                         LASSERT (local_nb[i].page != NULL);
746                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
747                                               pp_rnb[i].offset & (PAGE_SIZE-1),
748                                               page_rc);
749                 }
750
751                 if (page_rc != pp_rnb[i].len) { /* short read */
752                         /* All subsequent pages should be 0 */
753                         while(++i < npages)
754                                 LASSERT(local_nb[i].rc == 0);
755                         break;
756                 }
757         }
758
759         /* Check if client was evicted while we were doing i/o before touching
760            network */
761         if (rc == 0) {
762                 if (desc->bd_export->exp_failed)
763                         rc = -ENOTCONN;
764                 else {
765                         sptlrpc_svc_wrap_bulk(req, desc);
766
767                         rc = ptlrpc_start_bulk_transfer(desc);
768                 }
769
770                 if (rc == 0) {
771                         lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
772                                                    ost_bulk_timeout, desc);
773                         rc = l_wait_event(desc->bd_waitq,
774                                           !ptlrpc_bulk_active(desc) ||
775                                           desc->bd_export->exp_failed, &lwi);
776                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
777                         if (rc == -ETIMEDOUT) {
778                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
779                                 ptlrpc_abort_bulk(desc);
780                         } else if (desc->bd_export->exp_failed) {
781                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
782                                 rc = -ENOTCONN;
783                                 ptlrpc_abort_bulk(desc);
784                         } else if (!desc->bd_success ||
785                                    desc->bd_nob_transferred != desc->bd_nob) {
786                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
787                                           desc->bd_success ?
788                                           "truncated" : "network error on",
789                                           desc->bd_nob_transferred,
790                                           desc->bd_nob);
791                                 /* XXX should this be a different errno? */
792                                 rc = -ETIMEDOUT;
793                         }
794                 } else {
795                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
796                 }
797                 comms_error = rc != 0;
798         }
799
800         /* Must commit after prep above in all cases */
801         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
802                           ioo, npages, local_nb, oti, rc);
803
804         ost_nio_pages_put(req, local_nb, npages);
805
806         if (rc == 0) {
807                 repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
808                                          sizeof(*repbody));
809                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
810
811                 if (unlikely(do_checksum)) {
812                         repbody->oa.o_cksum = ost_checksum_bulk(desc);
813                         repbody->oa.o_valid |= OBD_MD_FLCKSUM;
814                         CDEBUG(D_PAGE, "checksum at read origin: %x\n",
815                                repbody->oa.o_cksum);
816                 }
817         }
818
819  out_lock:
820         ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh);
821  out_bulk:
822         ptlrpc_free_bulk(desc);
823  out:
824         LASSERT(rc <= 0);
825         if (rc == 0) {
826                 req->rq_status = nob;
827                 target_committed_to_req(req);
828                 ptlrpc_reply(req);
829         } else if (!comms_error) {
830                 /* Only reply if there was no comms problem with bulk */
831                 target_committed_to_req(req);
832                 req->rq_status = rc;
833                 ptlrpc_error(req);
834         } else {
835                 if (req->rq_reply_state != NULL) {
836                         /* reply out callback would free */
837                         ptlrpc_rs_decref(req->rq_reply_state);
838                         req->rq_reply_state = NULL;
839                 }
840                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
841                       "client will retry\n",
842                       req->rq_export->exp_obd->obd_name,
843                       req->rq_export->exp_client_uuid.uuid,
844                       req->rq_export->exp_connection->c_remote_uuid.uuid,
845                       libcfs_id2str(req->rq_peer));
846         }
847
848         RETURN(rc);
849 }
850
851 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
852 {
853         struct ptlrpc_bulk_desc *desc;
854         struct niobuf_remote    *remote_nb;
855         struct niobuf_remote    *pp_rnb;
856         struct niobuf_local     *local_nb;
857         struct obd_ioobj        *ioo;
858         struct ost_body         *body, *repbody;
859         struct l_wait_info       lwi;
860         struct lustre_handle     lockh = {0};
861         struct lustre_capa      *capa = NULL;
862         __u32                   *rcs;
863         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
864         int objcount, niocount, npages, comms_error = 0;
865         int rc, swab, i, j, do_checksum;
866         ENTRY;
867
868         req->rq_bulk_write = 1;
869
870         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
871                 GOTO(out, rc = -EIO);
872
873         /* pause before transaction has been started */
874         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
875                          (obd_timeout + 1) / 4);
876
877         swab = lustre_msg_swabbed(req->rq_reqmsg);
878         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
879                                   lustre_swab_ost_body);
880         if (body == NULL) {
881                 CERROR("Missing/short ost_body\n");
882                 GOTO(out, rc = -EFAULT);
883         }
884
885         LASSERT_REQSWAB(req, REQ_REC_OFF + 1);
886         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
887                    sizeof(*ioo);
888         if (objcount == 0) {
889                 CERROR("Missing/short ioobj\n");
890                 GOTO(out, rc = -EFAULT);
891         }
892         if (objcount > 1) {
893                 CERROR("too many ioobjs (%d)\n", objcount);
894                 GOTO(out, rc = -EFAULT);
895         }
896
897         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
898                              objcount * sizeof(*ioo));
899         LASSERT (ioo != NULL);
900         for (niocount = i = 0; i < objcount; i++) {
901                 if (swab)
902                         lustre_swab_obd_ioobj(&ioo[i]);
903                 if (ioo[i].ioo_bufcnt == 0) {
904                         CERROR("ioo[%d] has zero bufcnt\n", i);
905                         GOTO(out, rc = -EFAULT);
906                 }
907                 niocount += ioo[i].ioo_bufcnt;
908         }
909
910         if (niocount > PTLRPC_MAX_BRW_PAGES) {
911                 DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)\n",
912                           niocount);
913                 GOTO(out, rc = -EFAULT);
914         }
915
916         remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
917                                        niocount * sizeof(*remote_nb),
918                                        lustre_swab_niobuf_remote);
919         if (remote_nb == NULL) {
920                 CERROR("Missing/short niobuf\n");
921                 GOTO(out, rc = -EFAULT);
922         }
923         if (swab) {                             /* swab the remaining niobufs */
924                 for (i = 1; i < niocount; i++)
925                         lustre_swab_niobuf_remote (&remote_nb[i]);
926         }
927
928         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
929                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
930
931         size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
932         rc = lustre_pack_reply(req, 3, size, NULL);
933         if (rc != 0)
934                 GOTO(out, rc);
935         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
936                              niocount * sizeof(*rcs));
937
938         /*
939          * Per-thread array of struct niobuf_{local,remote}'s was allocated by
940          * ost_thread_init().
941          */
942         local_nb = ost_tls(req)->local;
943         pp_rnb   = ost_tls(req)->remote;
944
945         /* FIXME all niobuf splitting should be done in obdfilter if needed */
946         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
947         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
948         if (npages < 0)
949                 GOTO(out, rc = npages);
950
951         LASSERT(npages <= OST_THREAD_POOL_SIZE);
952
953         ost_nio_pages_get(req, local_nb, npages);
954
955         desc = ptlrpc_prep_bulk_exp(req, npages,
956                                      BULK_GET_SINK, OST_BULK_PORTAL);
957         if (desc == NULL)
958                 GOTO(out, rc = -ENOMEM);
959
960         rc = ost_brw_lock_get(LCK_PW, req->rq_export, ioo, pp_rnb, &lockh);
961         if (rc != 0)
962                 GOTO(out_bulk, rc);
963
964         ost_prolong_locks(req->rq_export, ioo, pp_rnb, LCK_PW);
965
966         /* obd_preprw clobbers oa->valid, so save what we need */
967         do_checksum = (body->oa.o_valid & OBD_MD_FLCKSUM);
968
969         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
970                         ioo, npages, pp_rnb, local_nb, oti, capa);
971         if (rc != 0)
972                 GOTO(out_lock, rc);
973
974         /* NB Having prepped, we must commit... */
975
976         for (i = 0; i < npages; i++)
977                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
978                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
979                                       pp_rnb[i].len);
980
981         /* Check if client was evicted while we were doing i/o before touching
982            network */
983         if (desc->bd_export->exp_failed)
984                 rc = -ENOTCONN;
985         else
986                 rc = ptlrpc_start_bulk_transfer (desc);
987         if (rc == 0) {
988                 lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
989                                            ost_bulk_timeout, desc);
990                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
991                                   desc->bd_export->exp_failed, &lwi);
992                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
993                 if (rc == -ETIMEDOUT) {
994                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
995                         ptlrpc_abort_bulk(desc);
996                 } else if (desc->bd_export->exp_failed) {
997                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
998                         rc = -ENOTCONN;
999                         ptlrpc_abort_bulk(desc);
1000                 } else if (!desc->bd_success ||
1001                            desc->bd_nob_transferred != desc->bd_nob) {
1002                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1003                                   desc->bd_success ?
1004                                   "truncated" : "network error on",
1005                                   desc->bd_nob_transferred, desc->bd_nob);
1006                         /* XXX should this be a different errno? */
1007                         rc = -ETIMEDOUT;
1008                 }
1009         } else {
1010                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
1011         }
1012         comms_error = rc != 0;
1013
1014         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1015                                  sizeof(*repbody));
1016         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1017
1018         if (unlikely(do_checksum && rc == 0)) {
1019                 static int cksum_counter;
1020                 obd_count client_cksum = body->oa.o_cksum;
1021                 obd_count cksum = ost_checksum_bulk(desc);
1022
1023                 cksum_counter++;
1024                 if (client_cksum != cksum) {
1025                         CERROR("Bad checksum: client %x, server %x id %s\n",
1026                                client_cksum, cksum,
1027                                libcfs_id2str(req->rq_peer));
1028                         cksum_counter = 0;
1029                         repbody->oa.o_cksum = cksum;
1030                         repbody->oa.o_valid |= OBD_MD_FLCKSUM;
1031                 } else if ((cksum_counter & (-cksum_counter)) ==
1032                            cksum_counter) {
1033                         CWARN("Checksum %u from %s: %x OK\n", cksum_counter,
1034                               libcfs_id2str(req->rq_peer), cksum);
1035                 } else {
1036                         cksum_counter++;
1037                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
1038                                 CWARN("Checksum %u from %s: %x OK\n",
1039                                       cksum_counter,
1040                                       libcfs_id2str(req->rq_peer), cksum);
1041                 }
1042         }
1043
1044         sptlrpc_svc_unwrap_bulk(req, desc);
1045
1046         /* Must commit after prep above in all cases */
1047         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
1048                            objcount, ioo, npages, local_nb, oti, rc);
1049
1050         ost_nio_pages_put(req, local_nb, npages);
1051
1052         if (rc == 0) {
1053                 /* set per-requested niobuf return codes */
1054                 for (i = j = 0; i < niocount; i++) {
1055                         int nob = remote_nb[i].len;
1056
1057                         rcs[i] = 0;
1058                         do {
1059                                 LASSERT(j < npages);
1060                                 if (local_nb[j].rc < 0)
1061                                         rcs[i] = local_nb[j].rc;
1062                                 nob -= pp_rnb[j].len;
1063                                 j++;
1064                         } while (nob > 0);
1065                         LASSERT(nob == 0);
1066                 }
1067                 LASSERT(j == npages);
1068         }
1069
1070  out_lock:
1071         ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh);
1072  out_bulk:
1073         ptlrpc_free_bulk(desc);
1074  out:
1075         if (rc == 0) {
1076                 oti_to_request(oti, req);
1077                 target_committed_to_req(req);
1078                 rc = ptlrpc_reply(req);
1079         } else if (!comms_error) {
1080                 /* Only reply if there was no comms problem with bulk */
1081                 target_committed_to_req(req);
1082                 req->rq_status = rc;
1083                 ptlrpc_error(req);
1084         } else {
1085                 if (req->rq_reply_state != NULL) {
1086                         /* reply out callback would free */
1087                         ptlrpc_rs_decref(req->rq_reply_state);
1088                         req->rq_reply_state = NULL;
1089                 }
1090                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
1091                       "client will retry\n",
1092                       req->rq_export->exp_obd->obd_name,
1093                       req->rq_export->exp_client_uuid.uuid,
1094                       req->rq_export->exp_connection->c_remote_uuid.uuid,
1095                       libcfs_id2str(req->rq_peer));
1096         }
1097         RETURN(rc);
1098 }
1099
1100 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1101 {
1102         char *key, *val = NULL;
1103         int keylen, vallen, rc = 0;
1104         ENTRY;
1105
1106         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1107         if (key == NULL) {
1108                 DEBUG_REQ(D_HA, req, "no set_info key");
1109                 RETURN(-EFAULT);
1110         }
1111         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1112
1113         rc = lustre_pack_reply(req, 1, NULL, NULL);
1114         if (rc)
1115                 RETURN(rc);
1116
1117         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1118         if (vallen)
1119                 val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
1120
1121         if (KEY_IS("evict_by_nid")) {
1122                 if (val && vallen)
1123                         obd_export_evict_by_nid(exp->exp_obd, val);
1124
1125                 GOTO(out, rc = 0);
1126         }
1127
1128         rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);
1129 out:
1130         lustre_msg_set_status(req->rq_repmsg, 0);
1131         RETURN(rc);
1132 }
1133
1134 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1135 {
1136         char *key;
1137         int keylen, rc = 0;
1138         int size[2] = { sizeof(struct ptlrpc_body), sizeof(obd_id) };
1139         obd_id *reply;
1140         ENTRY;
1141
1142         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1143         if (key == NULL) {
1144                 DEBUG_REQ(D_HA, req, "no get_info key");
1145                 RETURN(-EFAULT);
1146         }
1147         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1148
1149         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
1150                 RETURN(-EPROTO);
1151
1152         rc = lustre_pack_reply(req, 2, size, NULL);
1153         if (rc)
1154                 RETURN(rc);
1155
1156         reply = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*reply));
1157         rc = obd_get_info(exp, keylen, key, size, reply);
1158         lustre_msg_set_status(req->rq_repmsg, 0);
1159         RETURN(rc);
1160 }
1161
1162 static int ost_handle_quotactl(struct ptlrpc_request *req)
1163 {
1164         struct obd_quotactl *oqctl, *repoqc;
1165         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1166         ENTRY;
1167
1168         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1169                                    lustre_swab_obd_quotactl);
1170         if (oqctl == NULL)
1171                 GOTO(out, rc = -EPROTO);
1172
1173         rc = lustre_pack_reply(req, 2, size, NULL);
1174         if (rc)
1175                 GOTO(out, rc);
1176
1177         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1178
1179         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1180         *repoqc = *oqctl;
1181 out:
1182         RETURN(rc);
1183 }
1184
1185 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1186 {
1187         struct obd_quotactl *oqctl;
1188         int rc;
1189         ENTRY;
1190
1191         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1192                                    lustre_swab_obd_quotactl);
1193         if (oqctl == NULL)
1194                 RETURN(-EPROTO);
1195
1196         rc = lustre_pack_reply(req, 1, NULL, NULL);
1197         if (rc) {
1198                 CERROR("ost: out of memory while packing quotacheck reply\n");
1199                 RETURN(-ENOMEM);
1200         }
1201
1202         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1203         RETURN(0);
1204 }
1205
1206 static int ost_llog_handle_connect(struct obd_export *exp,
1207                                                    struct ptlrpc_request *req)
1208 {
1209         struct llogd_conn_body *body;
1210         int rc;
1211         ENTRY;
1212
1213         body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*body));
1214         rc = obd_llog_connect(exp, body);
1215         RETURN(rc);
1216 }
1217
1218
1219 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1220                                        struct obd_device *obd, int *process)
1221 {
1222         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1223         case OST_CONNECT: /* This will never get here, but for completeness. */
1224         case OST_DISCONNECT:
1225                *process = 1;
1226                RETURN(0);
1227
1228         case OBD_PING:
1229         case OST_CREATE:
1230         case OST_DESTROY:
1231         case OST_PUNCH:
1232         case OST_SETATTR:
1233         case OST_SYNC:
1234         case OST_WRITE:
1235         case OBD_LOG_CANCEL:
1236         case LDLM_ENQUEUE:
1237                 *process = target_queue_recovery_request(req, obd);
1238                 RETURN(0);
1239
1240         default:
1241                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1242                 *process = 0;
1243                 /* XXX what should we set rq_status to here? */
1244                 req->rq_status = -EAGAIN;
1245                 RETURN(ptlrpc_error(req));
1246         }
1247 }
1248
1249 int ost_msg_check_version(struct lustre_msg *msg)
1250 {
1251         int rc;
1252
1253         switch(lustre_msg_get_opc(msg)) {
1254         case OST_CONNECT:
1255         case OST_DISCONNECT:
1256         case OBD_PING:
1257         case SEC_CTX_INIT:
1258         case SEC_CTX_INIT_CONT:
1259         case SEC_CTX_FINI:
1260                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1261                 if (rc)
1262                         CERROR("bad opc %u version %08x, expecting %08x\n",
1263                                lustre_msg_get_opc(msg),
1264                                lustre_msg_get_version(msg),
1265                                LUSTRE_OBD_VERSION);
1266                 break;
1267         case OST_CREATE:
1268         case OST_DESTROY:
1269         case OST_GETATTR:
1270         case OST_SETATTR:
1271         case OST_WRITE:
1272         case OST_READ:
1273         case OST_PUNCH:
1274         case OST_STATFS:
1275         case OST_SYNC:
1276         case OST_SET_INFO:
1277         case OST_GET_INFO:
1278         case OST_QUOTACHECK:
1279         case OST_QUOTACTL:
1280                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1281                 if (rc)
1282                         CERROR("bad opc %u version %08x, expecting %08x\n",
1283                                lustre_msg_get_opc(msg),
1284                                lustre_msg_get_version(msg),
1285                                LUSTRE_OST_VERSION);
1286                 break;
1287         case LDLM_ENQUEUE:
1288         case LDLM_CONVERT:
1289         case LDLM_CANCEL:
1290         case LDLM_BL_CALLBACK:
1291         case LDLM_CP_CALLBACK:
1292                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1293                 if (rc)
1294                         CERROR("bad opc %u version %08x, expecting %08x\n",
1295                                lustre_msg_get_opc(msg),
1296                                lustre_msg_get_version(msg),
1297                                LUSTRE_DLM_VERSION);
1298                 break;
1299         case LLOG_ORIGIN_CONNECT:
1300         case OBD_LOG_CANCEL:
1301                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1302                 if (rc)
1303                         CERROR("bad opc %u version %08x, expecting %08x\n",
1304                                lustre_msg_get_opc(msg),
1305                                lustre_msg_get_version(msg),
1306                                LUSTRE_LOG_VERSION);
1307                 break;
1308         default:
1309                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1310                 rc = -ENOTSUPP;
1311         }
1312         return rc;
1313 }
1314
1315 int ost_handle(struct ptlrpc_request *req)
1316 {
1317         struct obd_trans_info trans_info = { 0, };
1318         struct obd_trans_info *oti = &trans_info;
1319         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1320         struct obd_device *obd = NULL;
1321         ENTRY;
1322
1323         LASSERT(current->journal_info == NULL);
1324
1325         /* primordial rpcs don't affect server recovery */
1326         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1327         case SEC_CTX_INIT:
1328         case SEC_CTX_INIT_CONT:
1329         case SEC_CTX_FINI:
1330                 GOTO(out, rc = 0);
1331         }
1332
1333         /* XXX identical to MDS */
1334         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
1335                 int recovering;
1336
1337                 if (req->rq_export == NULL) {
1338                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1339                                lustre_msg_get_opc(req->rq_reqmsg),
1340                                libcfs_id2str(req->rq_peer));
1341                         req->rq_status = -ENOTCONN;
1342                         GOTO(out, rc = -ENOTCONN);
1343                 }
1344
1345                 obd = req->rq_export->exp_obd;
1346
1347                 /* Check for aborted recovery. */
1348                 spin_lock_bh(&obd->obd_processing_task_lock);
1349                 recovering = obd->obd_recovering;
1350                 spin_unlock_bh(&obd->obd_processing_task_lock);
1351                 if (recovering) {
1352                         rc = ost_filter_recovery_request(req, obd,
1353                                                          &should_process);
1354                         if (rc || !should_process)
1355                                 RETURN(rc);
1356                         else if (should_process < 0) {
1357                                 req->rq_status = should_process;
1358                                 rc = ptlrpc_error(req);
1359                                 RETURN(rc);
1360                         }
1361                 }
1362         }
1363
1364         oti_init(oti, req);
1365         rc = ost_msg_check_version(req->rq_reqmsg);
1366         if (rc)
1367                 RETURN(rc);
1368
1369         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1370         case OST_CONNECT: {
1371                 CDEBUG(D_INODE, "connect\n");
1372                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
1373                 rc = target_handle_connect(req);
1374                 if (!rc)
1375                         obd = req->rq_export->exp_obd;
1376                 break;
1377         }
1378         case OST_DISCONNECT:
1379                 CDEBUG(D_INODE, "disconnect\n");
1380                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
1381                 rc = target_handle_disconnect(req);
1382                 break;
1383         case OST_CREATE:
1384                 CDEBUG(D_INODE, "create\n");
1385                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
1386                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1387                         GOTO(out, rc = -ENOSPC);
1388                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1389                         GOTO(out, rc = -EROFS);
1390                 rc = ost_create(req->rq_export, req, oti);
1391                 break;
1392         case OST_DESTROY:
1393                 CDEBUG(D_INODE, "destroy\n");
1394                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
1395                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1396                         GOTO(out, rc = -EROFS);
1397                 rc = ost_destroy(req->rq_export, req, oti);
1398                 break;
1399         case OST_GETATTR:
1400                 CDEBUG(D_INODE, "getattr\n");
1401                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
1402                 rc = ost_getattr(req->rq_export, req);
1403                 break;
1404         case OST_SETATTR:
1405                 CDEBUG(D_INODE, "setattr\n");
1406                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
1407                 rc = ost_setattr(req->rq_export, req, oti);
1408                 break;
1409         case OST_WRITE:
1410                 CDEBUG(D_INODE, "write\n");
1411                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1412                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_ENOSPC))
1413                         GOTO(out, rc = -ENOSPC);
1414                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1415                         GOTO(out, rc = -EROFS);
1416                 rc = ost_brw_write(req, oti);
1417                 LASSERT(current->journal_info == NULL);
1418                 /* ost_brw_write sends its own replies */
1419                 RETURN(rc);
1420         case OST_READ:
1421                 CDEBUG(D_INODE, "read\n");
1422                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1423                 rc = ost_brw_read(req, oti);
1424                 LASSERT(current->journal_info == NULL);
1425                 /* ost_brw_read sends its own replies */
1426                 RETURN(rc);
1427         case OST_PUNCH:
1428                 CDEBUG(D_INODE, "punch\n");
1429                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
1430                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_EROFS))
1431                         GOTO(out, rc = -EROFS);
1432                 rc = ost_punch(req->rq_export, req, oti);
1433                 break;
1434         case OST_STATFS:
1435                 CDEBUG(D_INODE, "statfs\n");
1436                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
1437                 rc = ost_statfs(req);
1438                 break;
1439         case OST_SYNC:
1440                 CDEBUG(D_INODE, "sync\n");
1441                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
1442                 rc = ost_sync(req->rq_export, req);
1443                 break;
1444         case OST_SET_INFO:
1445                 DEBUG_REQ(D_INODE, req, "set_info");
1446                 rc = ost_set_info(req->rq_export, req);
1447                 break;
1448         case OST_GET_INFO:
1449                 DEBUG_REQ(D_INODE, req, "get_info");
1450                 rc = ost_get_info(req->rq_export, req);
1451                 break;
1452         case OST_QUOTACHECK:
1453                 CDEBUG(D_INODE, "quotacheck\n");
1454                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACHECK_NET, 0);
1455                 rc = ost_handle_quotacheck(req);
1456                 break;
1457         case OST_QUOTACTL:
1458                 CDEBUG(D_INODE, "quotactl\n");
1459                 OBD_FAIL_RETURN(OBD_FAIL_OST_QUOTACTL_NET, 0);
1460                 rc = ost_handle_quotactl(req);
1461                 break;
1462         case OBD_PING:
1463                 DEBUG_REQ(D_INODE, req, "ping");
1464                 rc = target_handle_ping(req);
1465                 break;
1466         /* FIXME - just reply status */
1467         case LLOG_ORIGIN_CONNECT:
1468                 DEBUG_REQ(D_INODE, req, "log connect\n");
1469                 rc = ost_llog_handle_connect(req->rq_export, req);
1470                 req->rq_status = rc;
1471                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1472                 if (rc)
1473                         RETURN(rc);
1474                 RETURN(ptlrpc_reply(req));
1475         case OBD_LOG_CANCEL:
1476                 CDEBUG(D_INODE, "log cancel\n");
1477                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1478                 rc = llog_origin_handle_cancel(req);
1479                 req->rq_status = rc;
1480                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1481                 if (rc)
1482                         RETURN(rc);
1483                 RETURN(ptlrpc_reply(req));
1484         case LDLM_ENQUEUE:
1485                 CDEBUG(D_INODE, "enqueue\n");
1486                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1487                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1488                                          ldlm_server_blocking_ast,
1489                                          ldlm_server_glimpse_ast);
1490                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1491                 break;
1492         case LDLM_CONVERT:
1493                 CDEBUG(D_INODE, "convert\n");
1494                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1495                 rc = ldlm_handle_convert(req);
1496                 break;
1497         case LDLM_CANCEL:
1498                 CDEBUG(D_INODE, "cancel\n");
1499                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1500                 rc = ldlm_handle_cancel(req);
1501                 break;
1502         case LDLM_BL_CALLBACK:
1503         case LDLM_CP_CALLBACK:
1504                 CDEBUG(D_INODE, "callback\n");
1505                 CERROR("callbacks should not happen on OST\n");
1506                 /* fall through */
1507         default:
1508                 CERROR("Unexpected opcode %d\n",
1509                        lustre_msg_get_opc(req->rq_reqmsg));
1510                 req->rq_status = -ENOTSUPP;
1511                 rc = ptlrpc_error(req);
1512                 RETURN(rc);
1513         }
1514
1515         LASSERT(current->journal_info == NULL);
1516
1517         EXIT;
1518         /* If we're DISCONNECTing, the export_data is already freed */
1519         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
1520                 target_committed_to_req(req);
1521
1522 out:
1523         if (!rc)
1524                 oti_to_request(oti, req);
1525
1526         target_send_reply(req, rc, fail);
1527         return 0;
1528 }
1529 EXPORT_SYMBOL(ost_handle);
1530 /*
1531  * free per-thread pool created by ost_thread_init().
1532  */
1533 static void ost_thread_done(struct ptlrpc_thread *thread)
1534 {
1535         int i;
1536         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
1537                                              * Storage */
1538
1539         ENTRY;
1540
1541         LASSERT(thread != NULL);
1542
1543         /*
1544          * be prepared to handle partially-initialized pools (because this is
1545          * called from ost_thread_init() for cleanup.
1546          */
1547         tls = thread->t_data;
1548         if (tls != NULL) {
1549                 for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
1550                         if (tls->page[i] != NULL)
1551                                 __free_page(tls->page[i]);
1552                 }
1553                 OBD_FREE_PTR(tls);
1554                 thread->t_data = NULL;
1555         }
1556         EXIT;
1557 }
1558
1559 /*
1560  * initialize per-thread page pool (bug 5137).
1561  */
1562 static int ost_thread_init(struct ptlrpc_thread *thread)
1563 {
1564         int result;
1565         int i;
1566         struct ost_thread_local_cache *tls;
1567
1568         ENTRY;
1569
1570         LASSERT(thread != NULL);
1571         LASSERT(thread->t_data == NULL);
1572         LASSERT(thread->t_id < OST_MAX_THREADS);
1573
1574         OBD_ALLOC_PTR(tls);
1575         if (tls != NULL) {
1576                 result = 0;
1577                 thread->t_data = tls;
1578                 /*
1579                  * populate pool
1580                  */
1581                 for (i = 0; i < OST_THREAD_POOL_SIZE; ++ i) {
1582                         tls->page[i] = alloc_page(OST_THREAD_POOL_GFP);
1583                         if (tls->page[i] == NULL) {
1584                                 ost_thread_done(thread);
1585                                 result = -ENOMEM;
1586                                 break;
1587                         }
1588                 }
1589         } else
1590                 result = -ENOMEM;
1591         RETURN(result);
1592 }
1593
1594 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1595 {
1596         struct ost_obd *ost = &obd->u.ost;
1597         struct lprocfs_static_vars lvars;
1598         int rc;
1599         ENTRY;
1600
1601         rc = cleanup_group_info();
1602         if (rc)
1603                 RETURN(rc);
1604
1605         rc = llog_start_commit_thread();
1606         if (rc < 0)
1607                 RETURN(rc);
1608
1609         lprocfs_init_vars(ost, &lvars);
1610         lprocfs_obd_setup(obd, lvars.obd_vars);
1611
1612         sema_init(&ost->ost_health_sem, 1);
1613
1614         if (ost_num_threads < 2)
1615                 ost_num_threads = OST_DEF_THREADS;
1616         if (ost_num_threads > OST_MAX_THREADS)
1617                 ost_num_threads = OST_MAX_THREADS;
1618
1619         ost->ost_service =
1620                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1621                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
1622                                 OSC_REPLY_PORTAL,
1623                                 obd_timeout * 1000, ost_handle, LUSTRE_OSS_NAME,
1624                                 obd->obd_proc_entry, ost_print_req,
1625                                 ost_num_threads, LCT_DT_THREAD);
1626         if (ost->ost_service == NULL) {
1627                 CERROR("failed to start service\n");
1628                 GOTO(out_lprocfs, rc = -ENOMEM);
1629         }
1630
1631         rc = ptlrpc_start_threads(obd, ost->ost_service, "ll_ost");
1632         if (rc)
1633                 GOTO(out_service, rc = -EINVAL);
1634
1635         ost->ost_create_service =
1636                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1637                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
1638                                 OSC_REPLY_PORTAL,
1639                                 obd_timeout * 1000, ost_handle, "ost_create",
1640                                 obd->obd_proc_entry, ost_print_req, 1,
1641                                 LCT_DT_THREAD);
1642         if (ost->ost_create_service == NULL) {
1643                 CERROR("failed to start OST create service\n");
1644                 GOTO(out_service, rc = -ENOMEM);
1645         }
1646
1647         rc = ptlrpc_start_threads(obd, ost->ost_create_service,
1648                                   "ll_ost_creat");
1649         if (rc)
1650                 GOTO(out_create, rc = -EINVAL);
1651
1652         ost->ost_io_service =
1653                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1654                                 OST_MAXREPSIZE, OST_IO_PORTAL,
1655                                 OSC_REPLY_PORTAL,
1656                                 obd_timeout * 1000, ost_handle, "ost_io",
1657                                 obd->obd_proc_entry, ost_print_req,
1658                                 ost_num_threads, LCT_DT_THREAD);
1659         if (ost->ost_io_service == NULL) {
1660                 CERROR("failed to start OST I/O service\n");
1661                 GOTO(out_create, rc = -ENOMEM);
1662         }
1663
1664         ost->ost_io_service->srv_init = ost_thread_init;
1665         ost->ost_io_service->srv_done = ost_thread_done;
1666         ost->ost_io_service->srv_cpu_affinity = 1;
1667         rc = ptlrpc_start_threads(obd, ost->ost_io_service,
1668                                   "ll_ost_io");
1669         if (rc)
1670                 GOTO(out_io, rc = -EINVAL);
1671
1672         ping_evictor_start();
1673
1674         RETURN(0);
1675
1676 out_io:
1677         ptlrpc_unregister_service(ost->ost_io_service);
1678         ost->ost_io_service = NULL;
1679 out_create:
1680         ptlrpc_unregister_service(ost->ost_create_service);
1681         ost->ost_create_service = NULL;
1682 out_service:
1683         ptlrpc_unregister_service(ost->ost_service);
1684         ost->ost_service = NULL;
1685 out_lprocfs:
1686         lprocfs_obd_cleanup(obd);
1687         RETURN(rc);
1688 }
1689
1690 static int ost_cleanup(struct obd_device *obd)
1691 {
1692         struct ost_obd *ost = &obd->u.ost;
1693         int err = 0;
1694         ENTRY;
1695
1696         ping_evictor_stop();
1697
1698         spin_lock_bh(&obd->obd_processing_task_lock);
1699         if (obd->obd_recovering) {
1700                 target_cancel_recovery_timer(obd);
1701                 obd->obd_recovering = 0;
1702         }
1703         spin_unlock_bh(&obd->obd_processing_task_lock);
1704
1705         down(&ost->ost_health_sem);
1706         ptlrpc_unregister_service(ost->ost_service);
1707         ptlrpc_unregister_service(ost->ost_create_service);
1708         ptlrpc_unregister_service(ost->ost_io_service);
1709         ost->ost_service = NULL;
1710         ost->ost_create_service = NULL;
1711         up(&ost->ost_health_sem);
1712
1713         lprocfs_obd_cleanup(obd);
1714
1715         RETURN(err);
1716 }
1717
1718 static int ost_health_check(struct obd_device *obd)
1719 {
1720         struct ost_obd *ost = &obd->u.ost;
1721         int rc = 0;
1722
1723         down(&ost->ost_health_sem);
1724         rc |= ptlrpc_service_health_check(ost->ost_service);
1725         rc |= ptlrpc_service_health_check(ost->ost_create_service);
1726         rc |= ptlrpc_service_health_check(ost->ost_io_service);
1727         up(&ost->ost_health_sem);
1728
1729         /*
1730          * health_check to return 0 on healthy
1731          * and 1 on unhealthy.
1732          */
1733         if( rc != 0)
1734                 rc = 1;
1735
1736         return rc;
1737 }
1738
1739 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
1740 {
1741         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
1742 }
1743
1744 /* use obd ops to offer management infrastructure */
1745 static struct obd_ops ost_obd_ops = {
1746         .o_owner        = THIS_MODULE,
1747         .o_setup        = ost_setup,
1748         .o_cleanup      = ost_cleanup,
1749         .o_health_check = ost_health_check,
1750 };
1751
1752
1753 static int __init ost_init(void)
1754 {
1755         struct lprocfs_static_vars lvars;
1756         int rc;
1757         ENTRY;
1758
1759         lprocfs_init_vars(ost, &lvars);
1760         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1761                                  LUSTRE_OSS_NAME, NULL);
1762         RETURN(rc);
1763 }
1764
1765 static void /*__exit*/ ost_exit(void)
1766 {
1767         class_unregister_type(LUSTRE_OSS_NAME);
1768 }
1769
1770 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1771 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1772 MODULE_LICENSE("GPL");
1773
1774 module_init(ost_init);
1775 module_exit(ost_exit);