Whamcloud - gitweb
b8021fcfca730b5fecb9d2abfc79105e4f4bc518
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <libcfs/list.h>
47 #include <linux/lustre_sec.h>
48 #include <linux/lustre_audit.h>
49
50 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
51 {
52         if (oti == NULL)
53                 return;
54         memset(oti, 0, sizeof *oti);
55         oti->oti_nid = req->rq_peer.peer_id.nid;
56         if (req->rq_repmsg && req->rq_reqmsg != 0)
57                 oti->oti_transno = req->rq_repmsg->transno;
58 }
59
60 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
61 {
62         struct oti_req_ack_lock *ack_lock;
63         int i;
64
65         if (oti == NULL)
66                 return;
67
68         if (req->rq_repmsg)
69                 req->rq_repmsg->transno = oti->oti_transno;
70
71         /* XXX 4 == entries in oti_ack_locks??? */
72         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
73                 if (!ack_lock->mode)
74                         break;
75                 /* XXX not even calling target_send_reply in some cases... */
76                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
77         }
78 }
79
80 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
81                        struct obd_trans_info *oti)
82 {
83         struct ost_body *body, *repbody;
84         int rc, size = sizeof(*body);
85         ENTRY;
86
87         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
88         if (body == NULL)
89                 RETURN(-EFAULT);
90
91         rc = lustre_pack_reply(req, 1, &size, NULL);
92         if (rc)
93                 RETURN(rc);
94
95         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
96                 oti->oti_logcookies = obdo_logcookie(&body->oa);
97         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
98         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
99         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
100         RETURN(0);
101 }
102
103 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
104 {
105         struct ost_body *body, *repbody;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
110         if (body == NULL)
111                 RETURN(-EFAULT);
112
113         rc = lustre_pack_reply(req, 1, &size, NULL);
114         if (rc)
115                 RETURN(rc);
116
117         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
118         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
119         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
120         RETURN(0);
121 }
122
123 static int ost_statfs(struct ptlrpc_request *req)
124 {
125         struct obd_statfs *osfs;
126         int rc, size = sizeof(*osfs);
127         ENTRY;
128
129         rc = lustre_pack_reply(req, 1, &size, NULL);
130         if (rc)
131                 RETURN(rc);
132
133         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
134
135         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
136         if (req->rq_status != 0)
137                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
138
139         RETURN(0);
140 }
141
142 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
143                       struct obd_trans_info *oti)
144 {
145         struct ost_body *body, *repbody;
146         int rc, size = sizeof(*repbody);
147         ENTRY;
148
149         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
150         if (body == NULL)
151                 RETURN(-EFAULT);
152
153         rc = lustre_pack_reply(req, 1, &size, NULL);
154         if (rc)
155                 RETURN(rc);
156
157         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
158         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
159         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
160         req->rq_status = obd_create(exp, &repbody->oa, NULL, 0, NULL, oti);
161         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
162         RETURN(0);
163 }
164
165 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
166                      struct obd_trans_info *oti)
167 {
168         struct ost_body *body, *repbody;
169         struct lustre_capa *capa = NULL;
170         int rc, size = sizeof(*repbody);
171         ENTRY;
172
173         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
174         if (body == NULL)
175                 RETURN(-EFAULT);
176
177         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
178             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
179                 RETURN(-EINVAL);
180
181         if (body->oa.o_valid & OBD_MD_CAPA) {
182                 capa = lustre_swab_reqbuf(req, 1, sizeof(*capa),
183                                           lustre_swab_lustre_capa);
184                 if (capa == NULL) {
185                         CERROR("Missing/short capa\n");
186                         RETURN(-EFAULT);
187                 }
188         }
189
190         rc = lustre_pack_reply(req, 1, &size, NULL);
191         if (rc)
192                 RETURN(rc);
193
194         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
195         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
196         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
197                                    repbody->oa.o_blocks, oti, capa);
198         RETURN(0);
199 }
200
201 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
202 {
203         struct ost_body *body, *repbody;
204         int rc, size = sizeof(*repbody);
205         ENTRY;
206
207         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
208         if (body == NULL)
209                 RETURN(-EFAULT);
210
211         rc = lustre_pack_reply(req, 1, &size, NULL);
212         if (rc)
213                 RETURN(rc);
214
215         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
216         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
217         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
218                                   repbody->oa.o_blocks);
219         RETURN(0);
220 }
221
222 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
223                        struct obd_trans_info *oti)
224 {
225         struct ost_body *body, *repbody;
226         int rc, size = sizeof(*repbody);
227         ENTRY;
228
229         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
230         if (body == NULL)
231                 RETURN(-EFAULT);
232
233         rc = lustre_pack_reply(req, 1, &size, NULL);
234         if (rc)
235                 RETURN(rc);
236
237         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
238         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
239
240         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti, NULL);
241         RETURN(0);
242 }
243
244 static int ost_bulk_timeout(void *data)
245 {
246         ENTRY;
247         /* We don't fail the connection here, because having the export
248          * killed makes the (vital) call to commitrw very sad.
249          */
250         RETURN(1);
251 }
252
253 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
254                                 struct niobuf_remote *rnb, int nrnb,
255                                 struct niobuf_remote **pp_rnbp)
256 {
257         /* Copy a remote niobuf, splitting it into page-sized chunks
258          * and setting ioo[i].ioo_bufcnt accordingly */
259         struct niobuf_remote *pp_rnb;
260         int   i;
261         int   j;
262         int   page;
263         int   rnbidx = 0;
264         int   npages = 0;
265
266         /* first count and check the number of pages required */
267         for (i = 0; i < nioo; i++)
268                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
269                         obd_off offset = rnb[rnbidx].offset;
270                         obd_off p0 = offset >> PAGE_SHIFT;
271                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
272
273                         LASSERT(rnbidx < nrnb);
274
275                         npages += (pn + 1 - p0);
276
277                         if (rnb[rnbidx].len == 0) {
278                                 CERROR("zero len BRW: obj %d objid "LPX64
279                                        " buf %u\n", i, ioo[i].ioo_id, j);
280                                 return -EINVAL;
281                         }
282                         if (j > 0 &&
283                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
284                                 CERROR("unordered BRW: obj %d objid "LPX64
285                                        " buf %u offset "LPX64" <= "LPX64"\n",
286                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
287                                        rnb[rnbidx].offset);
288                                 return -EINVAL;
289                         }
290                 }
291
292         LASSERT(rnbidx == nrnb);
293
294         if (npages == nrnb) {       /* all niobufs are for single pages */
295                 *pp_rnbp = rnb;
296                 return npages;
297         }
298
299         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
300         if (pp_rnb == NULL)
301                 return -ENOMEM;
302
303         /* now do the actual split */
304         page = rnbidx = 0;
305         for (i = 0; i < nioo; i++) {
306                 int  obj_pages = 0;
307
308                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
309                         obd_off off = rnb[rnbidx].offset;
310                         int     nob = rnb[rnbidx].len;
311
312                         LASSERT(rnbidx < nrnb);
313                         do {
314                                 obd_off  poff = off & (PAGE_SIZE - 1);
315                                 int      pnob = (poff + nob > PAGE_SIZE) ?
316                                                 PAGE_SIZE - poff : nob;
317
318                                 LASSERT(page < npages);
319                                 pp_rnb[page].len = pnob;
320                                 pp_rnb[page].offset = off;
321                                 pp_rnb[page].flags = rnb[rnbidx].flags;
322
323                                 CDEBUG(0, "   obj %d id "LPX64
324                                        "page %d(%d) "LPX64" for %d, flg %x\n",
325                                        i, ioo[i].ioo_id, obj_pages, page,
326                                        pp_rnb[page].offset, pp_rnb[page].len,
327                                        pp_rnb[page].flags);
328                                 page++;
329                                 obj_pages++;
330
331                                 off += pnob;
332                                 nob -= pnob;
333                         } while (nob > 0);
334                         LASSERT(nob == 0);
335                 }
336                 ioo[i].ioo_bufcnt = obj_pages;
337         }
338         LASSERT(page == npages);
339
340         *pp_rnbp = pp_rnb;
341         return npages;
342 }
343
344 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
345                                    struct niobuf_remote *rnb)
346 {
347         if (pp_rnb == rnb)                      /* didn't allocate above */
348                 return;
349
350         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
351 }
352
353 #if CHECKSUM_BULK
354 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
355 {
356         obd_count cksum = 0;
357         struct ptlrpc_bulk_page *bp;
358
359         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
360                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
361                              bp->bp_buflen);
362                 kunmap(bp->bp_page);
363         }
364
365         return cksum;
366 }
367 #endif
368
369 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
370                              unsigned rw, unsigned phase)
371 {
372         struct obd_device *obd = req->rq_svc->srv_obddev;
373         struct timeval stop;
374         int ind = rw *3 + phase;
375          
376         if (obd && obd->obd_type && obd->obd_type->typ_name) {
377                 if (!strcmp(obd->obd_type->typ_name, OBD_OST_DEVICENAME)) {
378                         struct ost_obd *ost = NULL;
379                         
380                         ost = &obd->u.ost;
381                         if (ind >= (sizeof(ost->ost_stimes) / 
382                                     sizeof(ost->ost_stimes[0])))
383                                return;
384                         do_gettimeofday(&stop);
385
386                         spin_lock(&ost->ost_lock);
387                         lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
388                         spin_unlock(&ost->ost_lock);
389                         memcpy(start, &stop, sizeof(*start));
390                 }
391        } 
392 }
393
394 static int ost_brw_read(struct ptlrpc_request *req)
395 {
396         struct ptlrpc_bulk_desc *desc;
397         struct niobuf_remote    *remote_nb;
398         struct niobuf_remote    *pp_rnb;
399         struct niobuf_local     *local_nb;
400         struct obd_ioobj        *ioo;
401         struct ost_body         *body, *repbody;
402         struct lustre_capa      *capa = NULL;
403         struct l_wait_info       lwi;
404         struct obd_trans_info    oti = { 0 };
405         int                      size[1] = { sizeof(*body) };
406         int                      comms_error = 0;
407         int                      niocount;
408         int                      npages;
409         int                      nob = 0;
410         int                      rc;
411         int                      i, bufcnt = 0;
412         struct timeval           start;
413         ENTRY;
414
415         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
416                 GOTO(out, rc = -EIO);
417
418         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
419                          (obd_timeout + 1) / 4);
420
421         body = lustre_swab_reqbuf(req, bufcnt++, sizeof(*body),
422                                   lustre_swab_ost_body);
423         if (body == NULL) {
424                 CERROR("Missing/short ost_body\n");
425                 GOTO(out, rc = -EFAULT);
426         }
427
428         ioo = lustre_swab_reqbuf(req, bufcnt++, sizeof(*ioo),
429                                  lustre_swab_obd_ioobj);
430         if (ioo == NULL) {
431                 CERROR("Missing/short ioobj\n");
432                 GOTO(out, rc = -EFAULT);
433         }
434
435         if (body->oa.o_valid & OBD_MD_CAPA) {
436                 capa = lustre_swab_reqbuf(req, bufcnt++, sizeof(*capa),
437                                           lustre_swab_lustre_capa);
438                 if (capa == NULL) {
439                         CERROR("Missing/short capa\n");
440                         GOTO(out, rc = -EFAULT);
441                 }
442         }
443
444         niocount = ioo->ioo_bufcnt;
445         remote_nb = lustre_swab_reqbuf(req, bufcnt++,
446                                        niocount * sizeof(*remote_nb),
447                                        lustre_swab_niobuf_remote);
448         if (remote_nb == NULL) {
449                 CERROR("Missing/short niobuf\n");
450                 GOTO(out, rc = -EFAULT);
451         }
452         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
453                 for (i = 1; i < niocount; i++)
454                         lustre_swab_niobuf_remote (&remote_nb[i]);
455         }
456
457         rc = lustre_pack_reply(req, 1, size, NULL);
458         if (rc)
459                 GOTO(out, rc);
460
461         /* FIXME all niobuf splitting should be done in obdfilter if needed */
462         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
463         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
464         if (npages < 0)
465                 GOTO(out, rc = npages);
466
467         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
468         if (local_nb == NULL)
469                 GOTO(out_pp_rnb, rc = -ENOMEM);
470
471         desc = ptlrpc_prep_bulk_exp (req, npages, 
472                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
473         if (desc == NULL)
474                 GOTO(out_local, rc = -ENOMEM);
475
476         do_gettimeofday(&start);
477         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
478                         ioo, npages, pp_rnb, local_nb, &oti, capa);
479         ost_stime_record(req, &start, 0, 0);
480         if (rc != 0)
481                 GOTO(out_bulk, rc);
482
483         /* We're finishing using body->oa as an input variable */
484         body->oa.o_valid = 0;
485
486         nob = 0;
487         for (i = 0; i < npages; i++) {
488                 int page_rc = local_nb[i].rc;
489
490                 if (page_rc < 0) {              /* error */
491                         rc = page_rc;
492                         break;
493                 }
494
495                 LASSERT(page_rc <= pp_rnb[i].len);
496                 nob += page_rc;
497                 if (page_rc != 0) {             /* some data! */
498                         LASSERT (local_nb[i].page != NULL);
499                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
500                                               pp_rnb[i].offset & (PAGE_SIZE-1),
501                                               page_rc);
502                 }
503
504                 if (page_rc != pp_rnb[i].len) { /* short read */
505                         /* All subsequent pages should be 0 */
506                         while(++i < npages)
507                                 LASSERT(local_nb[i].rc == 0);
508                         break;
509                 }
510         }
511
512         if (rc == 0) {
513                 rc = ptlrpc_start_bulk_transfer(desc);
514                 if (rc == 0) {
515                         struct timeval tstart, now;
516                         do_gettimeofday(&tstart);
517                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 3,
518                                           ost_bulk_timeout, desc);
519                         rc = l_wait_event(desc->bd_waitq,
520                                           !ptlrpc_bulk_active(desc), &lwi);
521                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
522                         do_gettimeofday(&now);
523                         if (rc == -ETIMEDOUT) {
524                                 char cln_str[PTL_NALFMT_SIZE];
525                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT"
526                                           ", exp_conn_cnt = %u, real wait %us"
527                                           ", arrived %u.%u, served %u.%u",
528                                           req->rq_export->exp_conn_cnt,
529                                           (unsigned) (now.tv_sec - tstart.tv_sec),
530                                           (unsigned) req->rq_arrival_time.tv_sec,
531                                           (unsigned) req->rq_arrival_time.tv_usec,
532                                           (unsigned) req->rq_rpcd_start.tv_sec,
533                                           (unsigned) req->rq_rpcd_start.tv_usec);
534                                 CDEBUG(D_ERROR, "bulk PUT timeout: client %s\n",
535                                        ptlrpc_peernid2str(&req->rq_peer, cln_str));
536                                 ptlrpc_abort_bulk(desc);
537                         } else if (!desc->bd_success ||
538                                    desc->bd_nob_transferred != desc->bd_nob) {
539                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
540                                           desc->bd_success ?
541                                           "truncated" : "network error on",
542                                           desc->bd_nob_transferred,
543                                           desc->bd_nob);
544                                 /* XXX should this be a different errno? */
545                                 rc = -ETIMEDOUT;
546                         }
547                 } else {
548                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
549                 }
550                 comms_error = rc != 0;
551         }
552
553         ost_stime_record(req, &start, 0, 1);
554         /* Must commit after prep above in all cases */
555         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
556                           ioo, npages, local_nb, &oti, rc);
557         ost_stime_record(req, &start, 0, 2);
558
559         if (rc == 0) {
560                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
561                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
562
563 #if CHECKSUM_BULK
564                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
565                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
566 #endif
567         }
568
569  out_bulk:
570         ptlrpc_free_bulk(desc);
571  out_local:
572         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
573  out_pp_rnb:
574         free_per_page_niobufs(npages, pp_rnb, remote_nb);
575  out:
576         LASSERT(rc <= 0);
577         if (rc == 0) {
578                 req->rq_status = nob;
579                 ptlrpc_reply(req);
580         } else if (!comms_error) {
581                 /* only reply if comms OK */
582                 req->rq_status = rc;
583                 ptlrpc_error(req);
584         } else {
585                 if (req->rq_reply_state != NULL) {
586                         /* reply out callback would free */
587                         lustre_free_reply_state (req->rq_reply_state);
588                 }
589                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
590                         CERROR("bulk IO comms error: "
591                                "evicting %s@%s id %s\n",
592                                req->rq_export->exp_client_uuid.uuid,
593                                req->rq_export->exp_connection->c_remote_uuid.uuid,
594                                req->rq_peerstr);
595                         ptlrpc_fail_export(req->rq_export);
596                 } else {
597                         CERROR("ignoring bulk IO comms error: "
598                                "client reconnected %s@%s id %s\n",  
599                                req->rq_export->exp_client_uuid.uuid,
600                                req->rq_export->exp_connection->c_remote_uuid.uuid,
601                                req->rq_peerstr);
602                 }
603         }
604
605         RETURN(rc);
606 }
607
608 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
609 {
610         struct ptlrpc_bulk_desc *desc;
611         struct niobuf_remote    *remote_nb;
612         struct niobuf_remote    *pp_rnb;
613         struct niobuf_local     *local_nb;
614         struct obd_ioobj        *ioo;
615         struct lustre_capa      *capa = NULL;
616         struct ost_body         *body, *repbody;
617         struct l_wait_info       lwi;
618         __u32                   *rcs;
619         int                      size[2] = { sizeof(*body) };
620         int                      objcount, niocount, npages;
621         int                      comms_error = 0;
622         int                      rc, swab, i, j, bufcnt = 0;
623         struct timeval           start;        
624         ENTRY;
625
626         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
627                 GOTO(out, rc = -EIO);
628
629         /* pause before transaction has been started */
630         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
631                          (obd_timeout + 1) / 4);
632
633         swab = lustre_msg_swabbed(req->rq_reqmsg);
634         body = lustre_swab_reqbuf(req, bufcnt++, sizeof(*body),
635                                   lustre_swab_ost_body);
636         if (body == NULL) {
637                 CERROR("Missing/short ost_body\n");
638                 GOTO(out, rc = -EFAULT);
639         }
640
641         LASSERT_REQSWAB(req, 1);
642         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
643         if (objcount == 0) {
644                 CERROR("Missing/short ioobj\n");
645                 GOTO(out, rc = -EFAULT);
646         }
647         ioo = lustre_msg_buf(req->rq_reqmsg, bufcnt++,
648                              objcount * sizeof(*ioo));
649         LASSERT (ioo != NULL);
650         for (niocount = i = 0; i < objcount; i++) {
651                 if (swab)
652                         lustre_swab_obd_ioobj (&ioo[i]);
653                 if (ioo[i].ioo_bufcnt == 0) {
654                         CERROR("ioo[%d] has zero bufcnt\n", i);
655                         GOTO(out, rc = -EFAULT);
656                 }
657                 niocount += ioo[i].ioo_bufcnt;
658         }
659
660         if (body->oa.o_valid & OBD_MD_CAPA) {
661                 capa = lustre_swab_reqbuf(req, bufcnt++, sizeof(*capa),
662                                           lustre_swab_lustre_capa);
663                 if (capa == NULL) {
664                         CERROR("Missing/short capa\n");
665                         GOTO(out, rc = -EFAULT);
666                 }
667         }
668
669         remote_nb = lustre_swab_reqbuf(req, bufcnt++,
670                                        niocount * sizeof(*remote_nb),
671                                        lustre_swab_niobuf_remote);
672         if (remote_nb == NULL) {
673                 CERROR("Missing/short niobuf\n");
674                 GOTO(out, rc = -EFAULT);
675         }
676         if (swab) {                             /* swab the remaining niobufs */
677                 for (i = 1; i < niocount; i++)
678                         lustre_swab_niobuf_remote (&remote_nb[i]);
679         }
680
681         size[1] = niocount * sizeof(*rcs);
682         rc = lustre_pack_reply(req, 2, size, NULL);
683         if (rc != 0)
684                 GOTO(out, rc);
685         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
686
687 #if 0
688         /* Do snap options here*/
689         rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
690         if (rc)
691                 GOTO(out, rc);
692 #endif
693
694         /* FIXME all niobuf splitting should be done in obdfilter if needed */
695         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
696         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
697         if (npages < 0)
698                 GOTO(out, rc = npages);
699
700         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
701         if (local_nb == NULL)
702                 GOTO(out_pp_rnb, rc = -ENOMEM);
703
704         desc = ptlrpc_prep_bulk_exp (req, npages, 
705                                      BULK_GET_SINK, OST_BULK_PORTAL);
706         if (desc == NULL)
707                 GOTO(out_local, rc = -ENOMEM);
708
709         do_gettimeofday(&start);
710         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
711                         ioo, npages, pp_rnb, local_nb, oti, capa);
712         ost_stime_record(req, &start, 1, 0);
713         if (rc != 0)
714                 GOTO(out_bulk, rc);
715
716         /* NB Having prepped, we must commit... */
717
718         for (i = 0; i < npages; i++)
719                 ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
720                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
721                                       pp_rnb[i].len);
722
723         rc = ptlrpc_start_bulk_transfer (desc);
724         if (rc == 0) {
725                 struct timeval tstart, now;
726                 do_gettimeofday(&tstart);
727                 lwi = LWI_TIMEOUT(obd_timeout * HZ / 3,
728                                   ost_bulk_timeout, desc);
729                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
730                                   &lwi);
731                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
732                 do_gettimeofday(&now);
733                 if (rc == -ETIMEDOUT) {
734                         char cln_str[PTL_NALFMT_SIZE];
735                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET, "
736                                   "exp_conn_cnt = %u, real wait %us\n",
737                                   req->rq_export->exp_conn_cnt,
738                                   (unsigned) (now.tv_sec - tstart.tv_sec));
739                         CDEBUG(D_ERROR, "bulk GET timeout: client %s\n",
740                                ptlrpc_peernid2str(&req->rq_peer, cln_str));
741                         ptlrpc_abort_bulk(desc);
742                 } else if (!desc->bd_success ||
743                            desc->bd_nob_transferred != desc->bd_nob) {
744                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
745                                   desc->bd_success ? 
746                                   "truncated" : "network error on",
747                                   desc->bd_nob_transferred, desc->bd_nob);
748                         /* XXX should this be a different errno? */
749                         rc = -ETIMEDOUT;
750                 }
751         } else {
752                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
753         }
754         comms_error = rc != 0;
755
756         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
757         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
758
759 #if CHECKSUM_BULK
760         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
761                 static int cksum_counter;
762                 obd_count client_cksum = body->oa.o_cksum;
763                 obd_count cksum = ost_checksum_bulk(desc);
764
765                 if (client_cksum != cksum) {
766                         CERROR("Bad checksum: client %x, server %x id %s\n",
767                                client_cksum, cksum,
768                                req->rq_peerstr);
769                         cksum_counter = 1;
770                         repbody->oa.o_cksum = cksum;
771                 } else {
772                         cksum_counter++;
773                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
774                                 CWARN("Checksum %u from NID %s: %x OK\n",         
775                                       cksum_counter, req->rq_peerstr, cksum);
776                 }
777         }
778 #endif
779         ost_stime_record(req, &start, 1, 1);
780         /* Must commit after prep above in all cases */
781         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
782                           objcount, ioo, npages, local_nb, oti, rc);
783
784         ost_stime_record(req, &start, 1, 2);
785         if (rc == 0) {
786 #if CHECKSUM_BULK
787                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
788                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
789 #endif
790                 /* set per-requested niobuf return codes */
791                 for (i = j = 0; i < niocount; i++) {
792                         int nob = remote_nb[i].len;
793
794                         rcs[i] = 0;
795                         do {
796                                 LASSERT(j < npages);
797                                 if (local_nb[j].rc < 0)
798                                         rcs[i] = local_nb[j].rc;
799                                 nob -= pp_rnb[j].len;
800                                 j++;
801                         } while (nob > 0);
802                         LASSERT(nob == 0);
803                 }
804                 LASSERT(j == npages);
805         }
806         /*XXX This write extents only for write-back cache extents*/
807         rc = obd_write_extents(req->rq_export, ioo, objcount, niocount, 
808                                local_nb, rc);
809  out_bulk:
810         ptlrpc_free_bulk(desc);
811  out_local:
812         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
813  out_pp_rnb:
814         free_per_page_niobufs(npages, pp_rnb, remote_nb);
815  out:
816         if (rc == 0) {
817                 oti_to_request(oti, req);
818                 rc = ptlrpc_reply(req);
819         } else if (!comms_error) {
820                 /* Only reply if there was no comms problem with bulk */
821                 req->rq_status = rc;
822                 ptlrpc_error(req);
823         } else {
824                 if (req->rq_reply_state != NULL) {
825                         /* reply out callback would free */
826                         lustre_free_reply_state (req->rq_reply_state);
827                 }
828                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
829                         CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
830                                req->rq_export->exp_obd->obd_name,
831                                req->rq_export->exp_client_uuid.uuid,
832                                req->rq_export->exp_connection->c_remote_uuid.uuid,
833                                req->rq_peerstr);
834                         ptlrpc_fail_export(req->rq_export);
835                 } else {
836                         CERROR("ignoring bulk IO comms error: "
837                                "client reconnected %s@%s id %s\n",
838                                req->rq_export->exp_client_uuid.uuid,
839                                req->rq_export->exp_connection->c_remote_uuid.uuid,
840                                req->rq_peerstr);
841                 }
842         }
843         RETURN(rc);
844 }
845 EXPORT_SYMBOL(ost_brw_write);
846
847 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
848 {
849         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
850         struct obd_ioobj *ioo;
851         struct ost_body *body, *repbody;
852         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
853         int swab;
854         ENTRY;
855
856         /* XXX not set to use latest protocol */
857
858         swab = lustre_msg_swabbed(req->rq_reqmsg);
859         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
860         if (body == NULL) {
861                 CERROR("Missing/short ost_body\n");
862                 GOTO(out, rc = -EFAULT);
863         }
864
865         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
866         if (ioo == NULL) {
867                 CERROR("Missing/short ioobj\n");
868                 GOTO(out, rc = -EFAULT);
869         }
870         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
871         niocount = ioo[0].ioo_bufcnt;
872         for (i = 1; i < objcount; i++) {
873                 if (swab)
874                         lustre_swab_obd_ioobj (&ioo[i]);
875                 niocount += ioo[i].ioo_bufcnt;
876         }
877
878         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
879                                        lustre_swab_niobuf_remote);
880         if (remote_nb == NULL) {
881                 CERROR("Missing/short niobuf\n");
882                 GOTO(out, rc = -EFAULT);
883         }
884         if (swab) {                             /* swab the remaining niobufs */
885                 for (i = 1; i < niocount; i++)
886                         lustre_swab_niobuf_remote (&remote_nb[i]);
887         }
888
889         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
890         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
891         if (npages < 0)
892                 GOTO (out, rc = npages);
893  
894         size[1] = npages * sizeof(*pp_rnb);
895         rc = lustre_pack_reply(req, 2, size, NULL);
896         if (rc)
897                 GOTO(out_pp_rnb, rc);
898
899         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
900                                         objcount, ioo, npages, pp_rnb);
901
902         if (req->rq_status)
903                 GOTO(out_pp_rnb, rc = 0);
904
905         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
906         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
907
908         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
909         memcpy(res_nb, remote_nb, size[1]);
910         rc = 0;
911 out_pp_rnb:
912         free_per_page_niobufs(npages, pp_rnb, remote_nb);
913 out:
914         if (rc) {
915                 req->rq_status = rc;
916                 ptlrpc_error(req);
917         } else
918                 ptlrpc_reply(req);
919
920         return rc;
921 }
922
923 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
924 {
925         char *key, *val;
926         int keylen, rc = 0;
927         ENTRY;
928
929         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
930         if (key == NULL) {
931                 DEBUG_REQ(D_HA, req, "no set_info key");
932                 RETURN(-EFAULT);
933         }
934         keylen = req->rq_reqmsg->buflens[0];
935
936         rc = lustre_pack_reply(req, 0, NULL, NULL);
937         if (rc)
938                 RETURN(rc);
939
940         val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
941         if (val == NULL) {
942                 CERROR("val for setinfo can't be NULL\n");
943                 RETURN(-EFAULT);
944         }
945         
946         if (keylen == 8 && memcmp(key, "auditlog", 8) == 0) {
947                 lustre_swab_reqbuf(req, 1, sizeof(struct audit_msg),
948                                    lustre_swab_audit_msg);
949         } else if (keylen == 5 && strcmp(key, "audit") == 0) {
950                 lustre_swab_reqbuf(req, 1, sizeof(struct audit_attr_msg),
951                                    lustre_swab_audit_attr);
952         } else if (keylen == 9 && strcmp(key, "audit_obj") == 0) {
953                 lustre_swab_reqbuf(req, 1, sizeof(struct obdo),
954                                    lustre_swab_obdo);
955         } else if (keylen == 8 && memcmp(key, "capa_key", 8) == 0) {
956                 lustre_swab_reqbuf(req, 1, sizeof(struct lustre_capa_key),
957                                    lustre_swab_lustre_capa_key);
958         }
959
960         rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
961         req->rq_repmsg->status = 0;
962         RETURN(rc);
963 }
964
965 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
966 {
967         char *key;
968         int keylen, rc = 0, size = sizeof(obd_id);
969         obd_id *reply;
970         ENTRY;
971
972         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
973         if (key == NULL) {
974                 DEBUG_REQ(D_HA, req, "no get_info key");
975                 RETURN(-EFAULT);
976         }
977         keylen = req->rq_reqmsg->buflens[0];
978
979         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
980                 RETURN(-EPROTO);
981
982         rc = lustre_pack_reply(req, 1, &size, NULL);
983         if (rc)
984                 RETURN(rc);
985
986         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
987         rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
988         req->rq_repmsg->status = 0;
989         RETURN(rc);
990 }
991
992 static int ost_llog_handle_connect(struct obd_export *exp,
993                                    struct ptlrpc_request *req)
994 {
995         struct llogd_conn_body *body;
996         int rc;
997         ENTRY;
998
999         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
1000         rc = obd_llog_connect(exp, body);
1001         RETURN(rc);
1002 }
1003
1004 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1005                                        struct obd_device *obd, int *process)
1006 {
1007         switch (req->rq_reqmsg->opc) {
1008         case OST_CONNECT: /* This will never get here, but for completeness. */
1009         case OST_DISCONNECT:
1010                *process = 1;
1011                RETURN(0);
1012
1013         case OBD_PING:
1014         case OST_CREATE:
1015         case OST_DESTROY:
1016         case OST_PUNCH:
1017         case OST_SETATTR:
1018         case OST_SYNC:
1019         case OST_WRITE:
1020         case OBD_LOG_CANCEL:
1021         case LDLM_ENQUEUE:
1022                 *process = target_queue_recovery_request(req, obd);
1023                 RETURN(0);
1024
1025         default:
1026                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1027                 *process = 0;
1028                 /* XXX what should we set rq_status to here? */
1029                 req->rq_status = -EAGAIN;
1030                 RETURN(ptlrpc_error(req));
1031         }
1032 }
1033
1034 int ost_msg_check_version(struct lustre_msg *msg)
1035 {
1036         int rc;
1037
1038         switch(msg->opc) {
1039         case OST_CONNECT:
1040         case OST_DISCONNECT:
1041         case OBD_PING:
1042         case OST_CREATE:
1043         case OST_DESTROY:
1044         case OST_GETATTR:
1045         case OST_SETATTR:
1046         case OST_WRITE:
1047         case OST_READ:
1048         case OST_SAN_READ:
1049         case OST_SAN_WRITE:
1050         case OST_PUNCH:
1051         case OST_STATFS:
1052         case OST_SYNC:
1053         case OST_SET_INFO:
1054         case OST_GET_INFO:
1055                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1056                 if (rc)
1057                         CERROR("bad opc %u version %08x, expecting %08x\n",
1058                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
1059                 break;
1060         case LDLM_ENQUEUE:
1061         case LDLM_CONVERT:
1062         case LDLM_CANCEL:
1063         case LDLM_BL_CALLBACK:
1064         case LDLM_CP_CALLBACK:
1065                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1066                 if (rc)
1067                         CERROR("bad opc %u version %08x, expecting %08x\n",
1068                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
1069                 break;
1070         case OBD_LOG_CANCEL:
1071         case LLOG_ORIGIN_CONNECT:
1072                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1073                 if (rc)
1074                         CERROR("bad opc %u version %08x, expecting %08x\n",
1075                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1076                 break;
1077         case SEC_INIT:
1078         case SEC_INIT_CONTINUE:
1079         case SEC_FINI:
1080                 rc = 0;
1081                 break;
1082         default:
1083                 CERROR("OST unexpected opcode %d\n", msg->opc);
1084                 rc = -ENOTSUPP;
1085                 break;
1086         }
1087         return rc;
1088 }
1089
1090 int ost_handle(struct ptlrpc_request *req)
1091 {
1092         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1093         struct obd_trans_info *oti = NULL;
1094         struct obd_device *obd = NULL;
1095         ENTRY;
1096
1097         LASSERT(current->journal_info == NULL);
1098
1099         rc = ost_msg_check_version(req->rq_reqmsg);
1100         if (rc) {
1101                 CERROR("OST drop mal-formed request\n");
1102                 RETURN(rc);
1103         }
1104
1105         /* Security opc should NOT trigger any recovery events */
1106         if (req->rq_reqmsg->opc == SEC_INIT ||
1107             req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
1108             req->rq_reqmsg->opc == SEC_FINI) {
1109                 GOTO(out_check_req, rc = 0);
1110         }
1111
1112         /* XXX identical to MDS */
1113         if (req->rq_reqmsg->opc != OST_CONNECT) {
1114                 int recovering;
1115
1116                 if (req->rq_export == NULL) {
1117                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1118                                req->rq_reqmsg->opc,
1119                                req->rq_peerstr);
1120                         req->rq_status = -ENOTCONN;
1121                         GOTO(out_check_req, rc = -ENOTCONN);
1122                 }
1123
1124                 obd = req->rq_export->exp_obd;
1125
1126                 /* Check for aborted recovery. */
1127                 spin_lock_bh(&obd->obd_processing_task_lock);
1128                 recovering = obd->obd_recovering;
1129                 spin_unlock_bh(&obd->obd_processing_task_lock);
1130                 if (recovering) {
1131                         rc = ost_filter_recovery_request(req, obd,
1132                                                          &should_process);
1133                         if (rc || !should_process)
1134                                 RETURN(rc);
1135                         if (should_process < 0) {
1136                                 req->rq_status = should_process;
1137                                 rc = ptlrpc_error(req);
1138                                 RETURN(rc);
1139                         }
1140                 }
1141         }
1142
1143         OBD_ALLOC(oti, sizeof(*oti));
1144         if (oti == NULL)
1145                 RETURN(-ENOMEM);
1146                 
1147         oti_init(oti, req);
1148
1149         switch (req->rq_reqmsg->opc) {
1150         case OST_CONNECT: {
1151                 CDEBUG(D_INODE, "connect\n");
1152                 OBD_FAIL_GOTO(OBD_FAIL_OST_CONNECT_NET, out_free_oti, rc = 0);
1153                 rc = target_handle_connect(req);
1154                 if (!rc)
1155                         obd = req->rq_export->exp_obd;
1156                 break;
1157         }
1158         case OST_DISCONNECT:
1159                 CDEBUG(D_INODE, "disconnect\n");
1160                 OBD_FAIL_GOTO(OBD_FAIL_OST_DISCONNECT_NET, out_free_oti, rc = 0);
1161                 rc = target_handle_disconnect(req);
1162                 break;
1163         case OST_CREATE:
1164                 CDEBUG(D_INODE, "create\n");
1165                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1166                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1167                 OBD_FAIL_GOTO(OBD_FAIL_OST_CREATE_NET, out_free_oti, rc = 0);
1168                 rc = ost_create(req->rq_export, req, oti);
1169                 break;
1170         case OST_DESTROY:
1171                 CDEBUG(D_INODE, "destroy\n");
1172                 OBD_FAIL_GOTO(OBD_FAIL_OST_DESTROY_NET, out_free_oti, rc = 0);
1173                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1174                 rc = ost_destroy(req->rq_export, req, oti);
1175                 break;
1176         case OST_GETATTR:
1177                 CDEBUG(D_INODE, "getattr\n");
1178                 OBD_FAIL_GOTO(OBD_FAIL_OST_GETATTR_NET, out_free_oti, rc = 0);
1179                 rc = ost_getattr(req->rq_export, req);
1180                 break;
1181         case OST_SETATTR:
1182                 CDEBUG(D_INODE, "setattr\n");
1183                 OBD_FAIL_GOTO(OBD_FAIL_OST_SETATTR_NET, out_free_oti, rc = 0);
1184                 rc = ost_setattr(req->rq_export, req, oti);
1185                 break;
1186         case OST_WRITE:
1187                 CDEBUG(D_INODE, "write\n");
1188                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1189                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1190                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1191                 rc = ost_brw_write(req, oti);
1192                 LASSERT(current->journal_info == NULL);
1193                 /* ost_brw sends its own replies */
1194                 GOTO(out_free_oti, rc);
1195         case OST_READ:
1196                 CDEBUG(D_INODE, "read\n");
1197                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1198                 rc = ost_brw_read(req);
1199                 LASSERT(current->journal_info == NULL);
1200                 /* ost_brw sends its own replies */
1201                 GOTO(out_free_oti, rc);
1202         case OST_SAN_READ:
1203                 CDEBUG(D_INODE, "san read\n");
1204                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1205                 rc = ost_san_brw(req, OBD_BRW_READ);
1206                 /* ost_san_brw sends its own replies */
1207                 GOTO(out_free_oti, rc);
1208         case OST_SAN_WRITE:
1209                 CDEBUG(D_INODE, "san write\n");
1210                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1211                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1212                 /* ost_san_brw sends its own replies */
1213                 GOTO(out_free_oti, rc);
1214         case OST_PUNCH:
1215                 CDEBUG(D_INODE, "punch\n");
1216                 OBD_FAIL_GOTO(OBD_FAIL_OST_PUNCH_NET, out_free_oti, rc = 0);
1217                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1218                 rc = ost_punch(req->rq_export, req, oti);
1219                 break;
1220         case OST_STATFS:
1221                 CDEBUG(D_INODE, "statfs\n");
1222                 OBD_FAIL_GOTO(OBD_FAIL_OST_STATFS_NET, out_free_oti, rc = 0);
1223                 rc = ost_statfs(req);
1224                 break;
1225         case OST_SYNC:
1226                 CDEBUG(D_INODE, "sync\n");
1227                 OBD_FAIL_GOTO(OBD_FAIL_OST_SYNC_NET, out_free_oti, rc = 0);
1228                 rc = ost_sync(req->rq_export, req);
1229                 break;
1230         case OST_SET_INFO:
1231                 DEBUG_REQ(D_INODE, req, "set_info");
1232                 rc = ost_set_info(req->rq_export, req);
1233                 break;
1234         case OST_GET_INFO:
1235                 DEBUG_REQ(D_INODE, req, "get_info");
1236                 rc = ost_get_info(req->rq_export, req);
1237                 break;
1238         case OBD_PING:
1239                 DEBUG_REQ(D_INODE, req, "ping");
1240                 rc = target_handle_ping(req);
1241                 break;
1242         /* FIXME - just reply status */
1243         case LLOG_ORIGIN_CONNECT:
1244                 DEBUG_REQ(D_INODE, req, "log connect\n");
1245                 rc = ost_llog_handle_connect(req->rq_export, req); 
1246                 req->rq_status = rc;
1247                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1248                 if (rc)
1249                         GOTO(out_free_oti, rc);
1250                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1251         case OBD_LOG_CANCEL:
1252                 CDEBUG(D_INODE, "log cancel\n");
1253                 OBD_FAIL_GOTO(OBD_FAIL_OBD_LOG_CANCEL_NET, out_free_oti, rc = 0);
1254                 rc = llog_origin_handle_cancel(req);
1255                 req->rq_status = rc;
1256                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1257                 if (rc)
1258                         GOTO(out_free_oti, rc);
1259                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1260         case LDLM_ENQUEUE:
1261                 CDEBUG(D_INODE, "enqueue\n");
1262                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_ENQUEUE, out_free_oti, rc = 0);
1263                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1264                                          ldlm_server_blocking_ast,
1265                                          ldlm_server_glimpse_ast);
1266                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1267                 break;
1268         case LDLM_CONVERT:
1269                 CDEBUG(D_INODE, "convert\n");
1270                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CONVERT, out_free_oti, rc = 0);
1271                 rc = ldlm_handle_convert(req);
1272                 break;
1273         case LDLM_CANCEL:
1274                 CDEBUG(D_INODE, "cancel\n");
1275                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CANCEL, out_free_oti, rc = 0);
1276                 rc = ldlm_handle_cancel(req);
1277                 break;
1278         case LDLM_BL_CALLBACK:
1279         case LDLM_CP_CALLBACK:
1280                 CDEBUG(D_INODE, "callback\n");
1281                 CERROR("callbacks should not happen on OST\n");
1282                 /* fall through */
1283         default:
1284                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1285                 req->rq_status = -ENOTSUPP;
1286                 rc = ptlrpc_error(req);
1287                 GOTO(out_free_oti, rc);
1288         }
1289
1290         LASSERT(current->journal_info == NULL);
1291
1292         EXIT;
1293         /* If we're DISCONNECTing, the export_data is already freed */
1294         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1295                 if (!obd->obd_no_transno) {
1296                         req->rq_repmsg->last_committed =
1297                                 obd->obd_last_committed;
1298                 } else {
1299                         DEBUG_REQ(D_IOCTL, req,
1300                                   "not sending last_committed update");
1301                 }
1302                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1303                        obd->obd_last_committed, req->rq_xid);
1304         }
1305
1306 out_check_req:
1307
1308         if (!rc)
1309                 oti_to_request(oti, req);
1310         target_send_reply(req, rc, fail);
1311         rc = 0;
1312         
1313 out_free_oti:
1314         if (oti)
1315                 OBD_FREE(oti, sizeof(*oti));
1316         return rc;
1317 }
1318 EXPORT_SYMBOL(ost_handle);
1319
1320 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1321 {
1322         struct lprocfs_static_vars lvars;
1323
1324         lprocfs_init_vars(ost,&lvars);
1325         return lprocfs_obd_attach(dev, lvars.obd_vars);
1326 }
1327
1328 int ost_detach(struct obd_device *dev)
1329 {
1330         return lprocfs_obd_detach(dev);
1331 }
1332
1333 extern struct file_operations ost_stimes_fops;
1334
1335 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1336 {
1337         struct ost_obd *ost = &obd->u.ost;
1338         int rc;
1339         ENTRY;
1340
1341         rc = cleanup_group_info();
1342         if (rc)
1343                 RETURN(rc);
1344
1345         rc = llog_start_commit_thread();
1346         if (rc < 0)
1347                 RETURN(rc);
1348
1349         lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1350                                obd);
1351
1352         ost->ost_service =
1353                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1354                                 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 30000,
1355                                 ost_handle, "ost",
1356                                 obd->obd_proc_entry);
1357         if (ost->ost_service == NULL) {
1358                 CERROR("failed to start service\n");
1359                 RETURN(-ENOMEM);
1360         }
1361
1362         rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1363                                     "ll_ost");
1364         if (rc)
1365                 GOTO(out_service, rc = -EINVAL);
1366
1367         ost->ost_create_service =
1368                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1369                                 OST_CREATE_PORTAL, OSC_REPLY_PORTAL, 30000,
1370                                 ost_handle, "ost_create",
1371                                 obd->obd_proc_entry);
1372         if (ost->ost_create_service == NULL) {
1373                 CERROR("failed to start OST create service\n");
1374                 GOTO(out_service, rc = -ENOMEM);
1375         }
1376
1377
1378         spin_lock_init(&ost->ost_lock);
1379         ost->ost_service->srv_obddev = obd;
1380         
1381         rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1382                                     "ll_ost_creat");
1383         if (rc)
1384                 GOTO(out_create, rc = -EINVAL);
1385
1386         ost->ost_destroy_service =
1387                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1388                                 OST_DESTROY_PORTAL, OSC_REPLY_PORTAL, 30000,
1389                                 ost_handle, "ost_destroy",
1390                                 obd->obd_proc_entry);
1391         if (ost->ost_destroy_service == NULL) {
1392                 CERROR("failed to start service\n");
1393                 GOTO(out_create, rc = -ENOMEM);
1394         }
1395
1396         rc = ptlrpc_start_n_threads(obd, ost->ost_destroy_service,
1397                                     OST_NUM_THREADS, "ll_dstr_ost");
1398         if (rc)
1399                 GOTO(out_destroy, rc = -EINVAL);
1400
1401         RETURN(0);
1402
1403 out_destroy:
1404         ptlrpc_unregister_service(ost->ost_destroy_service);
1405 out_create:
1406         ptlrpc_unregister_service(ost->ost_create_service);
1407 out_service:
1408         ptlrpc_unregister_service(ost->ost_service);
1409         RETURN(rc);
1410 }
1411
1412 extern void lgss_svc_cache_purge_all(void);
1413 static int ost_cleanup(struct obd_device *obd, int flags)
1414 {
1415         struct ost_obd *ost = &obd->u.ost;
1416         int err = 0;
1417         ENTRY;
1418
1419         spin_lock_bh(&obd->obd_processing_task_lock);
1420         if (obd->obd_recovering) {
1421                 target_cancel_recovery_timer(obd);
1422                 obd->obd_recovering = 0;
1423         }
1424         spin_unlock_bh(&obd->obd_processing_task_lock);
1425
1426         ptlrpc_stop_all_threads(ost->ost_service);
1427         ptlrpc_unregister_service(ost->ost_service);
1428
1429         ptlrpc_stop_all_threads(ost->ost_create_service);
1430         ptlrpc_unregister_service(ost->ost_create_service);
1431
1432         ptlrpc_stop_all_threads(ost->ost_destroy_service);
1433         ptlrpc_unregister_service(ost->ost_destroy_service);
1434
1435 #ifdef ENABLE_GSS
1436         /* XXX */
1437         lgss_svc_cache_purge_all();
1438 #endif
1439         RETURN(err);
1440 }
1441
1442 /* use obd ops to offer management infrastructure */
1443 static struct obd_ops ost_obd_ops = {
1444         .o_owner        = THIS_MODULE,
1445         .o_attach       = ost_attach,
1446         .o_detach       = ost_detach,
1447         .o_setup        = ost_setup,
1448         .o_cleanup      = ost_cleanup,
1449 };
1450
1451 static int __init ost_init(void)
1452 {
1453         struct lprocfs_static_vars lvars;
1454         ENTRY;
1455
1456         lprocfs_init_vars(ost,&lvars);
1457         RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1458                                    OBD_OST_DEVICENAME));
1459 }
1460
1461 static void /*__exit*/ ost_exit(void)
1462 {
1463         class_unregister_type(OBD_OST_DEVICENAME);
1464 }
1465
1466 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1467 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1468 MODULE_LICENSE("GPL");
1469
1470 module_init(ost_init);
1471 module_exit(ost_exit);