Whamcloud - gitweb
b1fe3417cdd96e16bd7f2e966e6c9d47de896390
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <libcfs/list.h>
47 #include <linux/lustre_sec.h>
48 #include <linux/lustre_audit.h>
49
50 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
51 {
52         if (oti == NULL)
53                 return;
54         memset(oti, 0, sizeof *oti);
55
56         if (req->rq_repmsg && req->rq_reqmsg != 0)
57                 oti->oti_transno = req->rq_repmsg->transno;
58 }
59
60 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
61 {
62         struct oti_req_ack_lock *ack_lock;
63         int i;
64
65         if (oti == NULL)
66                 return;
67
68         if (req->rq_repmsg)
69                 req->rq_repmsg->transno = oti->oti_transno;
70
71         /* XXX 4 == entries in oti_ack_locks??? */
72         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
73                 if (!ack_lock->mode)
74                         break;
75                 /* XXX not even calling target_send_reply in some cases... */
76                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
77         }
78 }
79
80 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
81                        struct obd_trans_info *oti)
82 {
83         struct ost_body *body, *repbody;
84         int rc, size = sizeof(*body);
85         ENTRY;
86
87         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
88         if (body == NULL)
89                 RETURN(-EFAULT);
90
91         rc = lustre_pack_reply(req, 1, &size, NULL);
92         if (rc)
93                 RETURN(rc);
94
95         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
96                 oti->oti_logcookies = obdo_logcookie(&body->oa);
97         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
98         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
99         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
100         RETURN(0);
101 }
102
103 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
104 {
105         struct ost_body *body, *repbody;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
110         if (body == NULL)
111                 RETURN(-EFAULT);
112
113         rc = lustre_pack_reply(req, 1, &size, NULL);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
118         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
120         RETURN(0);
121 }
122
123 static int ost_statfs(struct ptlrpc_request *req)
124 {
125         struct obd_statfs *osfs;
126         int rc, size = sizeof(*osfs);
127         ENTRY;
128
129         rc = lustre_pack_reply(req, 1, &size, NULL);
130         if (rc)
131                 RETURN(rc);
132
133         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
134
135         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
136         if (req->rq_status != 0)
137                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
138
139         RETURN(0);
140 }
141
142 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
143                       struct obd_trans_info *oti)
144 {
145         struct ost_body *body, *repbody;
146         int rc, size = sizeof(*repbody);
147         ENTRY;
148
149         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
150         if (body == NULL)
151                 RETURN(-EFAULT);
152
153         rc = lustre_pack_reply(req, 1, &size, NULL);
154         if (rc)
155                 RETURN(rc);
156
157         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
158         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
160         req->rq_status = obd_create(exp, &repbody->oa, NULL, 0, NULL, oti);
161         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
162         RETURN(0);
163 }
164
165 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
166                      struct obd_trans_info *oti)
167 {
168         struct ost_body *body, *repbody;
169         int rc, size = sizeof(*repbody);
170         ENTRY;
171
172         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
173         if (body == NULL)
174                 RETURN(-EFAULT);
175
176         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
177             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
178                 RETURN(-EINVAL);
179
180         rc = lustre_pack_reply(req, 1, &size, NULL);
181         if (rc)
182                 RETURN(rc);
183
184         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
185         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
186         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
187                                    repbody->oa.o_blocks, oti);
188         RETURN(0);
189 }
190
191 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
192 {
193         struct ost_body *body, *repbody;
194         int rc, size = sizeof(*repbody);
195         ENTRY;
196
197         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
198         if (body == NULL)
199                 RETURN(-EFAULT);
200
201         rc = lustre_pack_reply(req, 1, &size, NULL);
202         if (rc)
203                 RETURN(rc);
204
205         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
206         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
207         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
208                                   repbody->oa.o_blocks);
209         RETURN(0);
210 }
211
212 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
213                        struct obd_trans_info *oti)
214 {
215         struct ost_body *body, *repbody;
216         int rc, size = sizeof(*repbody);
217         ENTRY;
218
219         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
220         if (body == NULL)
221                 RETURN(-EFAULT);
222
223         rc = lustre_pack_reply(req, 1, &size, NULL);
224         if (rc)
225                 RETURN(rc);
226
227         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
228         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
229
230         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
231         RETURN(0);
232 }
233
234 static int ost_bulk_timeout(void *data)
235 {
236         ENTRY;
237         /* We don't fail the connection here, because having the export
238          * killed makes the (vital) call to commitrw very sad.
239          */
240         RETURN(1);
241 }
242
243 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
244                                 struct niobuf_remote *rnb, int nrnb,
245                                 struct niobuf_remote **pp_rnbp)
246 {
247         /* Copy a remote niobuf, splitting it into page-sized chunks
248          * and setting ioo[i].ioo_bufcnt accordingly */
249         struct niobuf_remote *pp_rnb;
250         int   i;
251         int   j;
252         int   page;
253         int   rnbidx = 0;
254         int   npages = 0;
255
256         /* first count and check the number of pages required */
257         for (i = 0; i < nioo; i++)
258                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
259                         obd_off offset = rnb[rnbidx].offset;
260                         obd_off p0 = offset >> PAGE_SHIFT;
261                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
262
263                         LASSERT(rnbidx < nrnb);
264
265                         npages += (pn + 1 - p0);
266
267                         if (rnb[rnbidx].len == 0) {
268                                 CERROR("zero len BRW: obj %d objid "LPX64
269                                        " buf %u\n", i, ioo[i].ioo_id, j);
270                                 return -EINVAL;
271                         }
272                         if (j > 0 &&
273                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
274                                 CERROR("unordered BRW: obj %d objid "LPX64
275                                        " buf %u offset "LPX64" <= "LPX64"\n",
276                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
277                                        rnb[rnbidx].offset);
278                                 return -EINVAL;
279                         }
280                 }
281
282         LASSERT(rnbidx == nrnb);
283
284         if (npages == nrnb) {       /* all niobufs are for single pages */
285                 *pp_rnbp = rnb;
286                 return npages;
287         }
288
289         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
290         if (pp_rnb == NULL)
291                 return -ENOMEM;
292
293         /* now do the actual split */
294         page = rnbidx = 0;
295         for (i = 0; i < nioo; i++) {
296                 int  obj_pages = 0;
297
298                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
299                         obd_off off = rnb[rnbidx].offset;
300                         int     nob = rnb[rnbidx].len;
301
302                         LASSERT(rnbidx < nrnb);
303                         do {
304                                 obd_off  poff = off & (PAGE_SIZE - 1);
305                                 int      pnob = (poff + nob > PAGE_SIZE) ?
306                                                 PAGE_SIZE - poff : nob;
307
308                                 LASSERT(page < npages);
309                                 pp_rnb[page].len = pnob;
310                                 pp_rnb[page].offset = off;
311                                 pp_rnb[page].flags = rnb[rnbidx].flags;
312
313                                 CDEBUG(0, "   obj %d id "LPX64
314                                        "page %d(%d) "LPX64" for %d, flg %x\n",
315                                        i, ioo[i].ioo_id, obj_pages, page,
316                                        pp_rnb[page].offset, pp_rnb[page].len,
317                                        pp_rnb[page].flags);
318                                 page++;
319                                 obj_pages++;
320
321                                 off += pnob;
322                                 nob -= pnob;
323                         } while (nob > 0);
324                         LASSERT(nob == 0);
325                 }
326                 ioo[i].ioo_bufcnt = obj_pages;
327         }
328         LASSERT(page == npages);
329
330         *pp_rnbp = pp_rnb;
331         return npages;
332 }
333
334 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
335                                    struct niobuf_remote *rnb)
336 {
337         if (pp_rnb == rnb)                      /* didn't allocate above */
338                 return;
339
340         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
341 }
342
343 #if CHECKSUM_BULK
344 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
345 {
346         obd_count cksum = 0;
347         struct ptlrpc_bulk_page *bp;
348
349         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
350                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
351                              bp->bp_buflen);
352                 kunmap(bp->bp_page);
353         }
354
355         return cksum;
356 }
357 #endif
358
359 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
360                              unsigned rw, unsigned phase)
361 {
362         struct obd_device *obd = req->rq_svc->srv_obddev;
363         struct timeval stop;
364         int ind = rw *3 + phase;
365          
366         if (obd && obd->obd_type && obd->obd_type->typ_name) {
367                 if (!strcmp(obd->obd_type->typ_name, OBD_OST_DEVICENAME)) {
368                         struct ost_obd *ost = NULL;
369                         
370                         ost = &obd->u.ost;
371                         if (ind >= (sizeof(ost->ost_stimes) / 
372                                     sizeof(ost->ost_stimes[0])))
373                                return;
374                         do_gettimeofday(&stop);
375
376                         spin_lock(&ost->ost_lock);
377                         lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
378                         spin_unlock(&ost->ost_lock);
379                         memcpy(start, &stop, sizeof(*start));
380                 }
381        } 
382 }
383
384 static int ost_brw_read(struct ptlrpc_request *req)
385 {
386         struct ptlrpc_bulk_desc *desc;
387         struct niobuf_remote    *remote_nb;
388         struct niobuf_remote    *pp_rnb;
389         struct niobuf_local     *local_nb;
390         struct obd_ioobj        *ioo;
391         struct ost_body         *body, *repbody;
392         struct l_wait_info       lwi;
393         struct obd_trans_info    oti = { 0 };
394         int                      size[1] = { sizeof(*body) };
395         int                      comms_error = 0;
396         int                      niocount;
397         int                      npages;
398         int                      nob = 0;
399         int                      rc;
400         int                      i;
401         struct timeval           start;
402         ENTRY;
403
404         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
405                 GOTO(out, rc = -EIO);
406
407         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
408                          (obd_timeout + 1) / 4);
409
410         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
411         if (body == NULL) {
412                 CERROR("Missing/short ost_body\n");
413                 GOTO(out, rc = -EFAULT);
414         }
415
416         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
417         if (ioo == NULL) {
418                 CERROR("Missing/short ioobj\n");
419                 GOTO(out, rc = -EFAULT);
420         }
421
422         niocount = ioo->ioo_bufcnt;
423         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
424                                        lustre_swab_niobuf_remote);
425         if (remote_nb == NULL) {
426                 CERROR("Missing/short niobuf\n");
427                 GOTO(out, rc = -EFAULT);
428         }
429         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
430                 for (i = 1; i < niocount; i++)
431                         lustre_swab_niobuf_remote (&remote_nb[i]);
432         }
433
434         rc = lustre_pack_reply(req, 1, size, NULL);
435         if (rc)
436                 GOTO(out, rc);
437
438         /* FIXME all niobuf splitting should be done in obdfilter if needed */
439         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
440         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
441         if (npages < 0)
442                 GOTO(out, rc = npages);
443
444         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
445         if (local_nb == NULL)
446                 GOTO(out_pp_rnb, rc = -ENOMEM);
447
448         desc = ptlrpc_prep_bulk_exp (req, npages, 
449                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
450         if (desc == NULL)
451                 GOTO(out_local, rc = -ENOMEM);
452
453         do_gettimeofday(&start);
454         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
455                         ioo, npages, pp_rnb, local_nb, &oti);
456         ost_stime_record(req, &start, 0, 0);
457         if (rc != 0)
458                 GOTO(out_bulk, rc);
459
460         /* We're finishing using body->oa as an input variable */
461         body->oa.o_valid = 0;
462
463         nob = 0;
464         for (i = 0; i < npages; i++) {
465                 int page_rc = local_nb[i].rc;
466
467                 if (page_rc < 0) {              /* error */
468                         rc = page_rc;
469                         break;
470                 }
471
472                 LASSERT(page_rc <= pp_rnb[i].len);
473                 nob += page_rc;
474                 if (page_rc != 0) {             /* some data! */
475                         LASSERT (local_nb[i].page != NULL);
476                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
477                                               pp_rnb[i].offset & (PAGE_SIZE-1),
478                                               page_rc);
479                 }
480
481                 if (page_rc != pp_rnb[i].len) { /* short read */
482                         /* All subsequent pages should be 0 */
483                         while(++i < npages)
484                                 LASSERT(local_nb[i].rc == 0);
485                         break;
486                 }
487         }
488
489         if (rc == 0) {
490                 rc = ptlrpc_start_bulk_transfer(desc);
491                 if (rc == 0) {
492                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
493                                           ost_bulk_timeout, desc);
494                         rc = l_wait_event(desc->bd_waitq,
495                                           !ptlrpc_bulk_active(desc), &lwi);
496                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
497                         if (rc == -ETIMEDOUT) {
498                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT"
499                                           ", expt_conn_cnt = %u",
500                                           req->rq_export->exp_conn_cnt);
501                                 ptlrpc_abort_bulk(desc);
502                         } else if (!desc->bd_success ||
503                                    desc->bd_nob_transferred != desc->bd_nob) {
504                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
505                                           desc->bd_success ?
506                                           "truncated" : "network error on",
507                                           desc->bd_nob_transferred,
508                                           desc->bd_nob);
509                                 /* XXX should this be a different errno? */
510                                 rc = -ETIMEDOUT;
511                         }
512                 } else {
513                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
514                 }
515                 comms_error = rc != 0;
516         }
517
518         ost_stime_record(req, &start, 0, 1);
519         /* Must commit after prep above in all cases */
520         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
521                           ioo, npages, local_nb, &oti, rc);
522         ost_stime_record(req, &start, 0, 2);
523
524         if (rc == 0) {
525                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
526                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
527
528 #if CHECKSUM_BULK
529                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
530                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
531 #endif
532         }
533
534  out_bulk:
535         ptlrpc_free_bulk(desc);
536  out_local:
537         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
538  out_pp_rnb:
539         free_per_page_niobufs(npages, pp_rnb, remote_nb);
540  out:
541         LASSERT(rc <= 0);
542         if (rc == 0) {
543                 req->rq_status = nob;
544                 ptlrpc_reply(req);
545         } else if (!comms_error) {
546                 /* only reply if comms OK */
547                 req->rq_status = rc;
548                 ptlrpc_error(req);
549         } else {
550                 if (req->rq_reply_state != NULL) {
551                         /* reply out callback would free */
552                         lustre_free_reply_state (req->rq_reply_state);
553                 }
554                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
555                         CERROR("bulk IO comms error: "
556                                "evicting %s@%s id %s\n",
557                                req->rq_export->exp_client_uuid.uuid,
558                                req->rq_export->exp_connection->c_remote_uuid.uuid,
559                                req->rq_peerstr);
560                         ptlrpc_fail_export(req->rq_export);
561                 } else {
562                         CERROR("ignoring bulk IO comms error: "
563                                "client reconnected %s@%s id %s\n",  
564                                req->rq_export->exp_client_uuid.uuid,
565                                req->rq_export->exp_connection->c_remote_uuid.uuid,
566                                req->rq_peerstr);
567                 }
568         }
569
570         RETURN(rc);
571 }
572
573 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
574 {
575         struct ptlrpc_bulk_desc *desc;
576         struct niobuf_remote    *remote_nb;
577         struct niobuf_remote    *pp_rnb;
578         struct niobuf_local     *local_nb;
579         struct obd_ioobj        *ioo;
580         struct ost_body         *body, *repbody;
581         struct l_wait_info       lwi;
582         __u32                   *rcs;
583         int                      size[2] = { sizeof(*body) };
584         int                      objcount, niocount, npages;
585         int                      comms_error = 0;
586         int                      rc, swab, i, j;
587         struct timeval           start;        
588         ENTRY;
589
590         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
591                 GOTO(out, rc = -EIO);
592
593         /* pause before transaction has been started */
594         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
595                          (obd_timeout + 1) / 4);
596
597         swab = lustre_msg_swabbed(req->rq_reqmsg);
598         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
599         if (body == NULL) {
600                 CERROR("Missing/short ost_body\n");
601                 GOTO(out, rc = -EFAULT);
602         }
603
604         LASSERT_REQSWAB(req, 1);
605         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
606         if (objcount == 0) {
607                 CERROR("Missing/short ioobj\n");
608                 GOTO(out, rc = -EFAULT);
609         }
610         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
611         LASSERT (ioo != NULL);
612         for (niocount = i = 0; i < objcount; i++) {
613                 if (swab)
614                         lustre_swab_obd_ioobj (&ioo[i]);
615                 if (ioo[i].ioo_bufcnt == 0) {
616                         CERROR("ioo[%d] has zero bufcnt\n", i);
617                         GOTO(out, rc = -EFAULT);
618                 }
619                 niocount += ioo[i].ioo_bufcnt;
620         }
621
622         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
623                                        lustre_swab_niobuf_remote);
624         if (remote_nb == NULL) {
625                 CERROR("Missing/short niobuf\n");
626                 GOTO(out, rc = -EFAULT);
627         }
628         if (swab) {                             /* swab the remaining niobufs */
629                 for (i = 1; i < niocount; i++)
630                         lustre_swab_niobuf_remote (&remote_nb[i]);
631         }
632
633         size[1] = niocount * sizeof(*rcs);
634         rc = lustre_pack_reply(req, 2, size, NULL);
635         if (rc != 0)
636                 GOTO(out, rc);
637         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
638
639 #if 0
640         /* Do snap options here*/
641         rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
642         if (rc)
643                 GOTO(out, rc);
644 #endif
645
646         /* FIXME all niobuf splitting should be done in obdfilter if needed */
647         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
648         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
649         if (npages < 0)
650                 GOTO(out, rc = npages);
651
652         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
653         if (local_nb == NULL)
654                 GOTO(out_pp_rnb, rc = -ENOMEM);
655
656         desc = ptlrpc_prep_bulk_exp (req, npages, 
657                                      BULK_GET_SINK, OST_BULK_PORTAL);
658         if (desc == NULL)
659                 GOTO(out_local, rc = -ENOMEM);
660
661         do_gettimeofday(&start);
662         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
663                         ioo, npages, pp_rnb, local_nb, oti);
664         ost_stime_record(req, &start, 1, 0);
665         if (rc != 0)
666                 GOTO(out_bulk, rc);
667
668         /* NB Having prepped, we must commit... */
669
670         for (i = 0; i < npages; i++)
671                 ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
672                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
673                                       pp_rnb[i].len);
674
675         rc = ptlrpc_start_bulk_transfer (desc);
676         if (rc == 0) {
677                 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
678                                   ost_bulk_timeout, desc);
679                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
680                                   &lwi);
681                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
682                 if (rc == -ETIMEDOUT) {
683                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
684                         ptlrpc_abort_bulk(desc);
685                 } else if (!desc->bd_success ||
686                            desc->bd_nob_transferred != desc->bd_nob) {
687                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
688                                   desc->bd_success ? 
689                                   "truncated" : "network error on",
690                                   desc->bd_nob_transferred, desc->bd_nob);
691                         /* XXX should this be a different errno? */
692                         rc = -ETIMEDOUT;
693                 }
694         } else {
695                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
696         }
697         comms_error = rc != 0;
698
699         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
700         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
701
702 #if CHECKSUM_BULK
703         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
704                 static int cksum_counter;
705                 obd_count client_cksum = body->oa.o_cksum;
706                 obd_count cksum = ost_checksum_bulk(desc);
707
708                 if (client_cksum != cksum) {
709                         CERROR("Bad checksum: client %x, server %x id %s\n",
710                                client_cksum, cksum,
711                                req->rq_peerstr);
712                         cksum_counter = 1;
713                         repbody->oa.o_cksum = cksum;
714                 } else {
715                         cksum_counter++;
716                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
717                                 CWARN("Checksum %u from NID %s: %x OK\n",         
718                                       cksum_counter, req->rq_peerstr, cksum);
719                 }
720         }
721 #endif
722         ost_stime_record(req, &start, 1, 1);
723         /* Must commit after prep above in all cases */
724         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
725                           objcount, ioo, npages, local_nb, oti, rc);
726
727         ost_stime_record(req, &start, 1, 2);
728         if (rc == 0) {
729 #if CHECKSUM_BULK
730                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
731                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
732 #endif
733                 /* set per-requested niobuf return codes */
734                 for (i = j = 0; i < niocount; i++) {
735                         int nob = remote_nb[i].len;
736
737                         rcs[i] = 0;
738                         do {
739                                 LASSERT(j < npages);
740                                 if (local_nb[j].rc < 0)
741                                         rcs[i] = local_nb[j].rc;
742                                 nob -= pp_rnb[j].len;
743                                 j++;
744                         } while (nob > 0);
745                         LASSERT(nob == 0);
746                 }
747                 LASSERT(j == npages);
748         }
749         /*XXX This write extents only for write-back cache extents*/
750         rc = obd_write_extents(req->rq_export, ioo, objcount, niocount, 
751                                local_nb, rc);
752  out_bulk:
753         ptlrpc_free_bulk(desc);
754  out_local:
755         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
756  out_pp_rnb:
757         free_per_page_niobufs(npages, pp_rnb, remote_nb);
758  out:
759         if (rc == 0) {
760                 oti_to_request(oti, req);
761                 rc = ptlrpc_reply(req);
762         } else if (!comms_error) {
763                 /* Only reply if there was no comms problem with bulk */
764                 req->rq_status = rc;
765                 ptlrpc_error(req);
766         } else {
767                 if (req->rq_reply_state != NULL) {
768                         /* reply out callback would free */
769                         lustre_free_reply_state (req->rq_reply_state);
770                 }
771                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
772                         CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
773                                req->rq_export->exp_obd->obd_name,
774                                req->rq_export->exp_client_uuid.uuid,
775                                req->rq_export->exp_connection->c_remote_uuid.uuid,
776                                req->rq_peerstr);
777                         ptlrpc_fail_export(req->rq_export);
778                 } else {
779                         CERROR("ignoring bulk IO comms error: "
780                                "client reconnected %s@%s id %s\n",
781                                req->rq_export->exp_client_uuid.uuid,
782                                req->rq_export->exp_connection->c_remote_uuid.uuid,
783                                req->rq_peerstr);
784                 }
785         }
786         RETURN(rc);
787 }
788 EXPORT_SYMBOL(ost_brw_write);
789
790 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
791 {
792         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
793         struct obd_ioobj *ioo;
794         struct ost_body *body, *repbody;
795         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
796         int swab;
797         ENTRY;
798
799         /* XXX not set to use latest protocol */
800
801         swab = lustre_msg_swabbed(req->rq_reqmsg);
802         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
803         if (body == NULL) {
804                 CERROR("Missing/short ost_body\n");
805                 GOTO(out, rc = -EFAULT);
806         }
807
808         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
809         if (ioo == NULL) {
810                 CERROR("Missing/short ioobj\n");
811                 GOTO(out, rc = -EFAULT);
812         }
813         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
814         niocount = ioo[0].ioo_bufcnt;
815         for (i = 1; i < objcount; i++) {
816                 if (swab)
817                         lustre_swab_obd_ioobj (&ioo[i]);
818                 niocount += ioo[i].ioo_bufcnt;
819         }
820
821         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
822                                        lustre_swab_niobuf_remote);
823         if (remote_nb == NULL) {
824                 CERROR("Missing/short niobuf\n");
825                 GOTO(out, rc = -EFAULT);
826         }
827         if (swab) {                             /* swab the remaining niobufs */
828                 for (i = 1; i < niocount; i++)
829                         lustre_swab_niobuf_remote (&remote_nb[i]);
830         }
831
832         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
833         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
834         if (npages < 0)
835                 GOTO (out, rc = npages);
836  
837         size[1] = npages * sizeof(*pp_rnb);
838         rc = lustre_pack_reply(req, 2, size, NULL);
839         if (rc)
840                 GOTO(out_pp_rnb, rc);
841
842         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
843                                         objcount, ioo, npages, pp_rnb);
844
845         if (req->rq_status)
846                 GOTO(out_pp_rnb, rc = 0);
847
848         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
849         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
850
851         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
852         memcpy(res_nb, remote_nb, size[1]);
853         rc = 0;
854 out_pp_rnb:
855         free_per_page_niobufs(npages, pp_rnb, remote_nb);
856 out:
857         if (rc) {
858                 req->rq_status = rc;
859                 ptlrpc_error(req);
860         } else
861                 ptlrpc_reply(req);
862
863         return rc;
864 }
865
866 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
867 {
868         char *key, *val;
869         int keylen, rc = 0;
870         ENTRY;
871
872         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
873         if (key == NULL) {
874                 DEBUG_REQ(D_HA, req, "no set_info key");
875                 RETURN(-EFAULT);
876         }
877         keylen = req->rq_reqmsg->buflens[0];
878
879         rc = lustre_pack_reply(req, 0, NULL, NULL);
880         if (rc)
881                 RETURN(rc);
882
883         val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
884         
885         if (keylen == 8 && memcmp(key, "auditlog", 8) == 0) {
886                 lustre_swab_reqbuf(req, 1, sizeof(struct audit_msg),
887                                    lustre_swab_audit_msg);
888         }
889         else if (keylen == 5 && strcmp(key, "audit") == 0) {
890                 lustre_swab_reqbuf(req, 1, sizeof(struct audit_attr_msg),
891                                    lustre_swab_audit_attr);
892         }
893         else if (keylen == 9 && strcmp(key, "audit_obj") == 0) {
894                 lustre_swab_reqbuf(req, 1, sizeof(struct obdo),
895                                    lustre_swab_obdo);
896         }
897
898         rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
899         req->rq_repmsg->status = 0;
900         RETURN(rc);
901 }
902
903 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
904 {
905         char *key;
906         int keylen, rc = 0, size = sizeof(obd_id);
907         obd_id *reply;
908         ENTRY;
909
910         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
911         if (key == NULL) {
912                 DEBUG_REQ(D_HA, req, "no get_info key");
913                 RETURN(-EFAULT);
914         }
915         keylen = req->rq_reqmsg->buflens[0];
916
917         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
918                 RETURN(-EPROTO);
919
920         rc = lustre_pack_reply(req, 1, &size, NULL);
921         if (rc)
922                 RETURN(rc);
923
924         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
925         rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
926         req->rq_repmsg->status = 0;
927         RETURN(rc);
928 }
929
930 static int ost_llog_handle_connect(struct obd_export *exp,
931                                    struct ptlrpc_request *req)
932 {
933         struct llogd_conn_body *body;
934         int rc;
935         ENTRY;
936
937         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
938         rc = obd_llog_connect(exp, body);
939         RETURN(rc);
940 }
941
942 static int ost_filter_recovery_request(struct ptlrpc_request *req,
943                                        struct obd_device *obd, int *process)
944 {
945         switch (req->rq_reqmsg->opc) {
946         case OST_CONNECT: /* This will never get here, but for completeness. */
947         case OST_DISCONNECT:
948                *process = 1;
949                RETURN(0);
950
951         case OBD_PING:
952         case OST_CREATE:
953         case OST_DESTROY:
954         case OST_PUNCH:
955         case OST_SETATTR:
956         case OST_SYNC:
957         case OST_WRITE:
958         case OBD_LOG_CANCEL:
959         case LDLM_ENQUEUE:
960                 *process = target_queue_recovery_request(req, obd);
961                 RETURN(0);
962
963         default:
964                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
965                 *process = 0;
966                 /* XXX what should we set rq_status to here? */
967                 req->rq_status = -EAGAIN;
968                 RETURN(ptlrpc_error(req));
969         }
970 }
971
972 int ost_msg_check_version(struct lustre_msg *msg)
973 {
974         int rc;
975
976         switch(msg->opc) {
977         case OST_CONNECT:
978         case OST_DISCONNECT:
979         case OBD_PING:
980         case OST_CREATE:
981         case OST_DESTROY:
982         case OST_GETATTR:
983         case OST_SETATTR:
984         case OST_WRITE:
985         case OST_READ:
986         case OST_SAN_READ:
987         case OST_SAN_WRITE:
988         case OST_PUNCH:
989         case OST_STATFS:
990         case OST_SYNC:
991         case OST_SET_INFO:
992         case OST_GET_INFO:
993                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
994                 if (rc)
995                         CERROR("bad opc %u version %08x, expecting %08x\n",
996                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
997                 break;
998         case LDLM_ENQUEUE:
999         case LDLM_CONVERT:
1000         case LDLM_CANCEL:
1001         case LDLM_BL_CALLBACK:
1002         case LDLM_CP_CALLBACK:
1003                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1004                 if (rc)
1005                         CERROR("bad opc %u version %08x, expecting %08x\n",
1006                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
1007                 break;
1008         case OBD_LOG_CANCEL:
1009         case LLOG_ORIGIN_CONNECT:
1010                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1011                 if (rc)
1012                         CERROR("bad opc %u version %08x, expecting %08x\n",
1013                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1014                 break;
1015         case SEC_INIT:
1016         case SEC_INIT_CONTINUE:
1017         case SEC_FINI:
1018                 rc = 0;
1019                 break;
1020         default:
1021                 CERROR("OST unexpected opcode %d\n", msg->opc);
1022                 rc = -ENOTSUPP;
1023                 break;
1024         }
1025         return rc;
1026 }
1027
1028 int ost_handle(struct ptlrpc_request *req)
1029 {
1030         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1031         struct obd_trans_info *oti = NULL;
1032         struct obd_device *obd = NULL;
1033         ENTRY;
1034
1035         LASSERT(current->journal_info == NULL);
1036
1037         rc = ost_msg_check_version(req->rq_reqmsg);
1038         if (rc) {
1039                 CERROR("OST drop mal-formed request\n");
1040                 RETURN(rc);
1041         }
1042
1043         /* Security opc should NOT trigger any recovery events */
1044         if (req->rq_reqmsg->opc == SEC_INIT ||
1045             req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
1046             req->rq_reqmsg->opc == SEC_FINI) {
1047                 GOTO(out_check_req, rc = 0);
1048         }
1049
1050         /* XXX identical to MDS */
1051         if (req->rq_reqmsg->opc != OST_CONNECT) {
1052                 int recovering;
1053
1054                 if (req->rq_export == NULL) {
1055                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1056                                req->rq_reqmsg->opc,
1057                                req->rq_peerstr);
1058                         req->rq_status = -ENOTCONN;
1059                         GOTO(out_check_req, rc = -ENOTCONN);
1060                 }
1061
1062                 obd = req->rq_export->exp_obd;
1063
1064                 /* Check for aborted recovery. */
1065                 spin_lock_bh(&obd->obd_processing_task_lock);
1066                 recovering = obd->obd_recovering;
1067                 spin_unlock_bh(&obd->obd_processing_task_lock);
1068                 if (recovering) {
1069                         rc = ost_filter_recovery_request(req, obd,
1070                                                          &should_process);
1071                         if (rc || !should_process)
1072                                 RETURN(rc);
1073                         if (should_process < 0) {
1074                                 req->rq_status = should_process;
1075                                 rc = ptlrpc_error(req);
1076                                 RETURN(rc);
1077                         }
1078                 }
1079         }
1080
1081         OBD_ALLOC(oti, sizeof(*oti));
1082         if (oti == NULL)
1083                 RETURN(-ENOMEM);
1084                 
1085         oti_init(oti, req);
1086
1087         switch (req->rq_reqmsg->opc) {
1088         case OST_CONNECT: {
1089                 CDEBUG(D_INODE, "connect\n");
1090                 OBD_FAIL_GOTO(OBD_FAIL_OST_CONNECT_NET, out_free_oti, rc = 0);
1091                 rc = target_handle_connect(req);
1092                 if (!rc)
1093                         obd = req->rq_export->exp_obd;
1094                 break;
1095         }
1096         case OST_DISCONNECT:
1097                 CDEBUG(D_INODE, "disconnect\n");
1098                 OBD_FAIL_GOTO(OBD_FAIL_OST_DISCONNECT_NET, out_free_oti, rc = 0);
1099                 rc = target_handle_disconnect(req);
1100                 break;
1101         case OST_CREATE:
1102                 CDEBUG(D_INODE, "create\n");
1103                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1104                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1105                 OBD_FAIL_GOTO(OBD_FAIL_OST_CREATE_NET, out_free_oti, rc = 0);
1106                 rc = ost_create(req->rq_export, req, oti);
1107                 break;
1108         case OST_DESTROY:
1109                 CDEBUG(D_INODE, "destroy\n");
1110                 OBD_FAIL_GOTO(OBD_FAIL_OST_DESTROY_NET, out_free_oti, rc = 0);
1111                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1112                 rc = ost_destroy(req->rq_export, req, oti);
1113                 break;
1114         case OST_GETATTR:
1115                 CDEBUG(D_INODE, "getattr\n");
1116                 OBD_FAIL_GOTO(OBD_FAIL_OST_GETATTR_NET, out_free_oti, rc = 0);
1117                 rc = ost_getattr(req->rq_export, req);
1118                 break;
1119         case OST_SETATTR:
1120                 CDEBUG(D_INODE, "setattr\n");
1121                 OBD_FAIL_GOTO(OBD_FAIL_OST_SETATTR_NET, out_free_oti, rc = 0);
1122                 rc = ost_setattr(req->rq_export, req, oti);
1123                 break;
1124         case OST_WRITE:
1125                 CDEBUG(D_INODE, "write\n");
1126                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1127                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1128                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1129                 rc = ost_brw_write(req, oti);
1130                 LASSERT(current->journal_info == NULL);
1131                 /* ost_brw sends its own replies */
1132                 GOTO(out_free_oti, rc);
1133         case OST_READ:
1134                 CDEBUG(D_INODE, "read\n");
1135                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1136                 rc = ost_brw_read(req);
1137                 LASSERT(current->journal_info == NULL);
1138                 /* ost_brw sends its own replies */
1139                 GOTO(out_free_oti, rc);
1140         case OST_SAN_READ:
1141                 CDEBUG(D_INODE, "san read\n");
1142                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1143                 rc = ost_san_brw(req, OBD_BRW_READ);
1144                 /* ost_san_brw sends its own replies */
1145                 GOTO(out_free_oti, rc);
1146         case OST_SAN_WRITE:
1147                 CDEBUG(D_INODE, "san write\n");
1148                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1149                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1150                 /* ost_san_brw sends its own replies */
1151                 GOTO(out_free_oti, rc);
1152         case OST_PUNCH:
1153                 CDEBUG(D_INODE, "punch\n");
1154                 OBD_FAIL_GOTO(OBD_FAIL_OST_PUNCH_NET, out_free_oti, rc = 0);
1155                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1156                 rc = ost_punch(req->rq_export, req, oti);
1157                 break;
1158         case OST_STATFS:
1159                 CDEBUG(D_INODE, "statfs\n");
1160                 OBD_FAIL_GOTO(OBD_FAIL_OST_STATFS_NET, out_free_oti, rc = 0);
1161                 rc = ost_statfs(req);
1162                 break;
1163         case OST_SYNC:
1164                 CDEBUG(D_INODE, "sync\n");
1165                 OBD_FAIL_GOTO(OBD_FAIL_OST_SYNC_NET, out_free_oti, rc = 0);
1166                 rc = ost_sync(req->rq_export, req);
1167                 break;
1168         case OST_SET_INFO:
1169                 DEBUG_REQ(D_INODE, req, "set_info");
1170                 rc = ost_set_info(req->rq_export, req);
1171                 break;
1172         case OST_GET_INFO:
1173                 DEBUG_REQ(D_INODE, req, "get_info");
1174                 rc = ost_get_info(req->rq_export, req);
1175                 break;
1176         case OBD_PING:
1177                 DEBUG_REQ(D_INODE, req, "ping");
1178                 rc = target_handle_ping(req);
1179                 break;
1180         /* FIXME - just reply status */
1181         case LLOG_ORIGIN_CONNECT:
1182                 DEBUG_REQ(D_INODE, req, "log connect\n");
1183                 rc = ost_llog_handle_connect(req->rq_export, req); 
1184                 req->rq_status = rc;
1185                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1186                 if (rc)
1187                         GOTO(out_free_oti, rc);
1188                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1189         case OBD_LOG_CANCEL:
1190                 CDEBUG(D_INODE, "log cancel\n");
1191                 OBD_FAIL_GOTO(OBD_FAIL_OBD_LOG_CANCEL_NET, out_free_oti, rc = 0);
1192                 rc = llog_origin_handle_cancel(req);
1193                 req->rq_status = rc;
1194                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1195                 if (rc)
1196                         GOTO(out_free_oti, rc);
1197                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1198         case LDLM_ENQUEUE:
1199                 CDEBUG(D_INODE, "enqueue\n");
1200                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_ENQUEUE, out_free_oti, rc = 0);
1201                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1202                                          ldlm_server_blocking_ast,
1203                                          ldlm_server_glimpse_ast);
1204                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1205                 break;
1206         case LDLM_CONVERT:
1207                 CDEBUG(D_INODE, "convert\n");
1208                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CONVERT, out_free_oti, rc = 0);
1209                 rc = ldlm_handle_convert(req);
1210                 break;
1211         case LDLM_CANCEL:
1212                 CDEBUG(D_INODE, "cancel\n");
1213                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CANCEL, out_free_oti, rc = 0);
1214                 rc = ldlm_handle_cancel(req);
1215                 break;
1216         case LDLM_BL_CALLBACK:
1217         case LDLM_CP_CALLBACK:
1218                 CDEBUG(D_INODE, "callback\n");
1219                 CERROR("callbacks should not happen on OST\n");
1220                 /* fall through */
1221         default:
1222                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1223                 req->rq_status = -ENOTSUPP;
1224                 rc = ptlrpc_error(req);
1225                 GOTO(out_free_oti, rc);
1226         }
1227
1228         LASSERT(current->journal_info == NULL);
1229
1230         EXIT;
1231         /* If we're DISCONNECTing, the export_data is already freed */
1232         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1233                 if (!obd->obd_no_transno) {
1234                         req->rq_repmsg->last_committed =
1235                                 obd->obd_last_committed;
1236                 } else {
1237                         DEBUG_REQ(D_IOCTL, req,
1238                                   "not sending last_committed update");
1239                 }
1240                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1241                        obd->obd_last_committed, req->rq_xid);
1242         }
1243
1244 out_check_req:
1245
1246         if (!rc)
1247                 oti_to_request(oti, req);
1248         target_send_reply(req, rc, fail);
1249         rc = 0;
1250         
1251 out_free_oti:
1252         if (oti)
1253                 OBD_FREE(oti, sizeof(*oti));
1254         return rc;
1255 }
1256 EXPORT_SYMBOL(ost_handle);
1257
1258 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1259 {
1260         struct lprocfs_static_vars lvars;
1261
1262         lprocfs_init_vars(ost,&lvars);
1263         return lprocfs_obd_attach(dev, lvars.obd_vars);
1264 }
1265
1266 int ost_detach(struct obd_device *dev)
1267 {
1268         return lprocfs_obd_detach(dev);
1269 }
1270
1271 extern struct file_operations ost_stimes_fops;
1272
1273 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1274 {
1275         struct ost_obd *ost = &obd->u.ost;
1276         int rc;
1277         ENTRY;
1278
1279         rc = cleanup_group_info();
1280         if (rc)
1281                 RETURN(rc);
1282
1283         rc = llog_start_commit_thread();
1284         if (rc < 0)
1285                 RETURN(rc);
1286
1287         lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1288                                obd);
1289
1290         ost->ost_service =
1291                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1292                                 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 30000,
1293                                 ost_handle, "ost",
1294                                 obd->obd_proc_entry);
1295         if (ost->ost_service == NULL) {
1296                 CERROR("failed to start service\n");
1297                 RETURN(-ENOMEM);
1298         }
1299
1300         rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1301                                     "ll_ost");
1302         if (rc)
1303                 GOTO(out_service, rc = -EINVAL);
1304
1305         ost->ost_create_service =
1306                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1307                                 OST_CREATE_PORTAL, OSC_REPLY_PORTAL, 30000,
1308                                 ost_handle, "ost_create",
1309                                 obd->obd_proc_entry);
1310         if (ost->ost_create_service == NULL) {
1311                 CERROR("failed to start OST create service\n");
1312                 GOTO(out_service, rc = -ENOMEM);
1313         }
1314
1315
1316         spin_lock_init(&ost->ost_lock);
1317         ost->ost_service->srv_obddev = obd;
1318         
1319         rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1320                                     "ll_ost_creat");
1321         if (rc)
1322                 GOTO(out_create, rc = -EINVAL);
1323
1324         ost->ost_destroy_service =
1325                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1326                                 OST_DESTROY_PORTAL, OSC_REPLY_PORTAL, 30000,
1327                                 ost_handle, "ost_destroy",
1328                                 obd->obd_proc_entry);
1329         if (ost->ost_destroy_service == NULL) {
1330                 CERROR("failed to start service\n");
1331                 GOTO(out_create, rc = -ENOMEM);
1332         }
1333
1334         rc = ptlrpc_start_n_threads(obd, ost->ost_destroy_service,
1335                                     OST_NUM_THREADS, "ll_dstr_ost");
1336         if (rc)
1337                 GOTO(out_destroy, rc = -EINVAL);
1338
1339         RETURN(0);
1340
1341 out_destroy:
1342         ptlrpc_unregister_service(ost->ost_destroy_service);
1343 out_create:
1344         ptlrpc_unregister_service(ost->ost_create_service);
1345 out_service:
1346         ptlrpc_unregister_service(ost->ost_service);
1347         RETURN(rc);
1348 }
1349
1350 extern void lgss_svc_cache_purge_all(void);
1351 static int ost_cleanup(struct obd_device *obd, int flags)
1352 {
1353         struct ost_obd *ost = &obd->u.ost;
1354         int err = 0;
1355         ENTRY;
1356
1357         spin_lock_bh(&obd->obd_processing_task_lock);
1358         if (obd->obd_recovering) {
1359                 target_cancel_recovery_timer(obd);
1360                 obd->obd_recovering = 0;
1361         }
1362         spin_unlock_bh(&obd->obd_processing_task_lock);
1363
1364         ptlrpc_stop_all_threads(ost->ost_service);
1365         ptlrpc_unregister_service(ost->ost_service);
1366
1367         ptlrpc_stop_all_threads(ost->ost_create_service);
1368         ptlrpc_unregister_service(ost->ost_create_service);
1369
1370         ptlrpc_stop_all_threads(ost->ost_destroy_service);
1371         ptlrpc_unregister_service(ost->ost_destroy_service);
1372
1373 #ifdef ENABLE_GSS
1374         /* XXX */
1375         lgss_svc_cache_purge_all();
1376 #endif
1377         RETURN(err);
1378 }
1379
1380 /* use obd ops to offer management infrastructure */
1381 static struct obd_ops ost_obd_ops = {
1382         .o_owner        = THIS_MODULE,
1383         .o_attach       = ost_attach,
1384         .o_detach       = ost_detach,
1385         .o_setup        = ost_setup,
1386         .o_cleanup      = ost_cleanup,
1387 };
1388
1389 static int __init ost_init(void)
1390 {
1391         struct lprocfs_static_vars lvars;
1392         ENTRY;
1393
1394         lprocfs_init_vars(ost,&lvars);
1395         RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1396                                    OBD_OST_DEVICENAME));
1397 }
1398
1399 static void /*__exit*/ ost_exit(void)
1400 {
1401         class_unregister_type(OBD_OST_DEVICENAME);
1402 }
1403
1404 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1405 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1406 MODULE_LICENSE("GPL");
1407
1408 module_init(ost_init);
1409 module_exit(ost_exit);