Whamcloud - gitweb
land b_hd_sec onto HEAD:
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <libcfs/list.h>
47
48 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
49 {
50         if (oti == NULL)
51                 return;
52         memset(oti, 0, sizeof *oti);
53
54         if (req->rq_repmsg && req->rq_reqmsg != 0)
55                 oti->oti_transno = req->rq_repmsg->transno;
56 }
57
58 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
59 {
60         struct oti_req_ack_lock *ack_lock;
61         int i;
62
63         if (oti == NULL)
64                 return;
65
66         if (req->rq_repmsg)
67                 req->rq_repmsg->transno = oti->oti_transno;
68
69         /* XXX 4 == entries in oti_ack_locks??? */
70         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
71                 if (!ack_lock->mode)
72                         break;
73                 /* XXX not even calling target_send_reply in some cases... */
74                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
75         }
76 }
77
78 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
79                        struct obd_trans_info *oti)
80 {
81         struct ost_body *body, *repbody;
82         int rc, size = sizeof(*body);
83         ENTRY;
84
85         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
86         if (body == NULL)
87                 RETURN(-EFAULT);
88
89         rc = lustre_pack_reply(req, 1, &size, NULL);
90         if (rc)
91                 RETURN(rc);
92
93         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
94                 oti->oti_logcookies = obdo_logcookie(&body->oa);
95         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
96         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
97         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
98         RETURN(0);
99 }
100
101 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
102 {
103         struct ost_body *body, *repbody;
104         int rc, size = sizeof(*body);
105         ENTRY;
106
107         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
108         if (body == NULL)
109                 RETURN(-EFAULT);
110
111         rc = lustre_pack_reply(req, 1, &size, NULL);
112         if (rc)
113                 RETURN(rc);
114
115         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
116         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
117         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
118         RETURN(0);
119 }
120
121 static int ost_statfs(struct ptlrpc_request *req)
122 {
123         struct obd_statfs *osfs;
124         int rc, size = sizeof(*osfs);
125         ENTRY;
126
127         rc = lustre_pack_reply(req, 1, &size, NULL);
128         if (rc)
129                 RETURN(rc);
130
131         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
132
133         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
134         if (req->rq_status != 0)
135                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
136
137         RETURN(0);
138 }
139
140 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
141                       struct obd_trans_info *oti)
142 {
143         struct ost_body *body, *repbody;
144         int rc, size = sizeof(*repbody);
145         ENTRY;
146
147         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
148         if (body == NULL)
149                 RETURN(-EFAULT);
150
151         rc = lustre_pack_reply(req, 1, &size, NULL);
152         if (rc)
153                 RETURN(rc);
154
155         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
156         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
157         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
158         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
159         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
160         RETURN(0);
161 }
162
163 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
164                      struct obd_trans_info *oti)
165 {
166         struct ost_body *body, *repbody;
167         int rc, size = sizeof(*repbody);
168         ENTRY;
169
170         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
171         if (body == NULL)
172                 RETURN(-EFAULT);
173
174         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
175             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
176                 RETURN(-EINVAL);
177
178         rc = lustre_pack_reply(req, 1, &size, NULL);
179         if (rc)
180                 RETURN(rc);
181
182         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
183         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
184         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
185                                    repbody->oa.o_blocks, oti);
186         RETURN(0);
187 }
188
189 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
190 {
191         struct ost_body *body, *repbody;
192         int rc, size = sizeof(*repbody);
193         ENTRY;
194
195         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
196         if (body == NULL)
197                 RETURN(-EFAULT);
198
199         rc = lustre_pack_reply(req, 1, &size, NULL);
200         if (rc)
201                 RETURN(rc);
202
203         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
204         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
205         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
206                                   repbody->oa.o_blocks);
207         RETURN(0);
208 }
209
210 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
211                        struct obd_trans_info *oti)
212 {
213         struct ost_body *body, *repbody;
214         int rc, size = sizeof(*repbody);
215         ENTRY;
216
217         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
218         if (body == NULL)
219                 RETURN(-EFAULT);
220
221         rc = lustre_pack_reply(req, 1, &size, NULL);
222         if (rc)
223                 RETURN(rc);
224
225         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
226         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
227
228         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
229         RETURN(0);
230 }
231
232 static int ost_bulk_timeout(void *data)
233 {
234         ENTRY;
235         /* We don't fail the connection here, because having the export
236          * killed makes the (vital) call to commitrw very sad.
237          */
238         RETURN(1);
239 }
240
241 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
242                                 struct niobuf_remote *rnb, int nrnb,
243                                 struct niobuf_remote **pp_rnbp)
244 {
245         /* Copy a remote niobuf, splitting it into page-sized chunks
246          * and setting ioo[i].ioo_bufcnt accordingly */
247         struct niobuf_remote *pp_rnb;
248         int   i;
249         int   j;
250         int   page;
251         int   rnbidx = 0;
252         int   npages = 0;
253
254         /* first count and check the number of pages required */
255         for (i = 0; i < nioo; i++)
256                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
257                         obd_off offset = rnb[rnbidx].offset;
258                         obd_off p0 = offset >> PAGE_SHIFT;
259                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
260
261                         LASSERT(rnbidx < nrnb);
262
263                         npages += (pn + 1 - p0);
264
265                         if (rnb[rnbidx].len == 0) {
266                                 CERROR("zero len BRW: obj %d objid "LPX64
267                                        " buf %u\n", i, ioo[i].ioo_id, j);
268                                 return -EINVAL;
269                         }
270                         if (j > 0 &&
271                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
272                                 CERROR("unordered BRW: obj %d objid "LPX64
273                                        " buf %u offset "LPX64" <= "LPX64"\n",
274                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
275                                        rnb[rnbidx].offset);
276                                 return -EINVAL;
277                         }
278                 }
279
280         LASSERT(rnbidx == nrnb);
281
282         if (npages == nrnb) {       /* all niobufs are for single pages */
283                 *pp_rnbp = rnb;
284                 return npages;
285         }
286
287         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
288         if (pp_rnb == NULL)
289                 return -ENOMEM;
290
291         /* now do the actual split */
292         page = rnbidx = 0;
293         for (i = 0; i < nioo; i++) {
294                 int  obj_pages = 0;
295
296                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
297                         obd_off off = rnb[rnbidx].offset;
298                         int     nob = rnb[rnbidx].len;
299
300                         LASSERT(rnbidx < nrnb);
301                         do {
302                                 obd_off  poff = off & (PAGE_SIZE - 1);
303                                 int      pnob = (poff + nob > PAGE_SIZE) ?
304                                                 PAGE_SIZE - poff : nob;
305
306                                 LASSERT(page < npages);
307                                 pp_rnb[page].len = pnob;
308                                 pp_rnb[page].offset = off;
309                                 pp_rnb[page].flags = rnb[rnbidx].flags;
310
311                                 CDEBUG(0, "   obj %d id "LPX64
312                                        "page %d(%d) "LPX64" for %d, flg %x\n",
313                                        i, ioo[i].ioo_id, obj_pages, page,
314                                        pp_rnb[page].offset, pp_rnb[page].len,
315                                        pp_rnb[page].flags);
316                                 page++;
317                                 obj_pages++;
318
319                                 off += pnob;
320                                 nob -= pnob;
321                         } while (nob > 0);
322                         LASSERT(nob == 0);
323                 }
324                 ioo[i].ioo_bufcnt = obj_pages;
325         }
326         LASSERT(page == npages);
327
328         *pp_rnbp = pp_rnb;
329         return npages;
330 }
331
332 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
333                                    struct niobuf_remote *rnb)
334 {
335         if (pp_rnb == rnb)                      /* didn't allocate above */
336                 return;
337
338         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
339 }
340
341 #if CHECKSUM_BULK
342 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
343 {
344         obd_count cksum = 0;
345         struct ptlrpc_bulk_page *bp;
346
347         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
348                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
349                              bp->bp_buflen);
350                 kunmap(bp->bp_page);
351         }
352
353         return cksum;
354 }
355 #endif
356
357 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
358                              unsigned rw, unsigned phase)
359 {
360         struct obd_device *obd = req->rq_svc->srv_obddev;
361         struct timeval stop;
362         int ind = rw *3 + phase;
363          
364         if (obd && obd->obd_type && obd->obd_type->typ_name) {
365                 if (!strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME)) {
366                         struct ost_obd *ost = NULL;
367                         
368                         ost = &obd->u.ost;
369                         if (ind >= (sizeof(ost->ost_stimes) / 
370                                     sizeof(ost->ost_stimes[0])))
371                                return;
372                         do_gettimeofday(&stop);
373
374                         spin_lock(&ost->ost_lock);
375                         lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
376                         spin_unlock(&ost->ost_lock);
377                         memcpy(start, &stop, sizeof(*start));
378                 }
379        } 
380 }
381
382 static int ost_brw_read(struct ptlrpc_request *req)
383 {
384         struct ptlrpc_bulk_desc *desc;
385         struct niobuf_remote    *remote_nb;
386         struct niobuf_remote    *pp_rnb;
387         struct niobuf_local     *local_nb;
388         struct obd_ioobj        *ioo;
389         struct ost_body         *body, *repbody;
390         struct l_wait_info       lwi;
391         struct obd_trans_info    oti = { 0 };
392         int                      size[1] = { sizeof(*body) };
393         int                      comms_error = 0;
394         int                      niocount;
395         int                      npages;
396         int                      nob = 0;
397         int                      rc;
398         int                      i;
399         struct timeval           start;
400         ENTRY;
401
402         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
403                 GOTO(out, rc = -EIO);
404
405         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
406                          (obd_timeout + 1) / 4);
407
408         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
409         if (body == NULL) {
410                 CERROR("Missing/short ost_body\n");
411                 GOTO(out, rc = -EFAULT);
412         }
413
414         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
415         if (ioo == NULL) {
416                 CERROR("Missing/short ioobj\n");
417                 GOTO(out, rc = -EFAULT);
418         }
419
420         niocount = ioo->ioo_bufcnt;
421         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
422                                        lustre_swab_niobuf_remote);
423         if (remote_nb == NULL) {
424                 CERROR("Missing/short niobuf\n");
425                 GOTO(out, rc = -EFAULT);
426         }
427         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
428                 for (i = 1; i < niocount; i++)
429                         lustre_swab_niobuf_remote (&remote_nb[i]);
430         }
431
432         rc = lustre_pack_reply(req, 1, size, NULL);
433         if (rc)
434                 GOTO(out, rc);
435
436         /* FIXME all niobuf splitting should be done in obdfilter if needed */
437         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
438         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
439         if (npages < 0)
440                 GOTO(out, rc = npages);
441
442         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
443         if (local_nb == NULL)
444                 GOTO(out_pp_rnb, rc = -ENOMEM);
445
446         desc = ptlrpc_prep_bulk_exp (req, npages, 
447                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
448         if (desc == NULL)
449                 GOTO(out_local, rc = -ENOMEM);
450
451         do_gettimeofday(&start);
452         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
453                         ioo, npages, pp_rnb, local_nb, &oti);
454         ost_stime_record(req, &start, 0, 0);
455         if (rc != 0)
456                 GOTO(out_bulk, rc);
457
458         /* We're finishing using body->oa as an input variable */
459         body->oa.o_valid = 0;
460
461         nob = 0;
462         for (i = 0; i < npages; i++) {
463                 int page_rc = local_nb[i].rc;
464
465                 if (page_rc < 0) {              /* error */
466                         rc = page_rc;
467                         break;
468                 }
469
470                 LASSERT(page_rc <= pp_rnb[i].len);
471                 nob += page_rc;
472                 if (page_rc != 0) {             /* some data! */
473                         LASSERT (local_nb[i].page != NULL);
474                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
475                                               pp_rnb[i].offset & (PAGE_SIZE-1),
476                                               page_rc);
477                 }
478
479                 if (page_rc != pp_rnb[i].len) { /* short read */
480                         /* All subsequent pages should be 0 */
481                         while(++i < npages)
482                                 LASSERT(local_nb[i].rc == 0);
483                         break;
484                 }
485         }
486
487         if (rc == 0) {
488                 rc = ptlrpc_start_bulk_transfer(desc);
489                 if (rc == 0) {
490                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
491                                           ost_bulk_timeout, desc);
492                         rc = l_wait_event(desc->bd_waitq,
493                                           !ptlrpc_bulk_active(desc), &lwi);
494                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
495                         if (rc == -ETIMEDOUT) {
496                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
497                                 ptlrpc_abort_bulk(desc);
498                         } else if (!desc->bd_success ||
499                                    desc->bd_nob_transferred != desc->bd_nob) {
500                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
501                                           desc->bd_success ?
502                                           "truncated" : "network error on",
503                                           desc->bd_nob_transferred,
504                                           desc->bd_nob);
505                                 /* XXX should this be a different errno? */
506                                 rc = -ETIMEDOUT;
507                         }
508                 } else {
509                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
510                 }
511                 comms_error = rc != 0;
512         }
513
514         ost_stime_record(req, &start, 0, 1);
515         /* Must commit after prep above in all cases */
516         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
517                           ioo, npages, local_nb, &oti, rc);
518         ost_stime_record(req, &start, 0, 2);
519
520         if (rc == 0) {
521                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
522                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
523
524 #if CHECKSUM_BULK
525                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
526                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
527 #endif
528         }
529
530  out_bulk:
531         ptlrpc_free_bulk(desc);
532  out_local:
533         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
534  out_pp_rnb:
535         free_per_page_niobufs(npages, pp_rnb, remote_nb);
536  out:
537         LASSERT(rc <= 0);
538         if (rc == 0) {
539                 req->rq_status = nob;
540                 ptlrpc_reply(req);
541         } else if (!comms_error) {
542                 /* only reply if comms OK */
543                 req->rq_status = rc;
544                 ptlrpc_error(req);
545         } else {
546                 if (req->rq_reply_state != NULL) {
547                         /* reply out callback would free */
548                         lustre_free_reply_state (req->rq_reply_state);
549                 }
550                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
551                         CERROR("bulk IO comms error: "
552                                "evicting %s@%s id %s\n",
553                                req->rq_export->exp_client_uuid.uuid,
554                                req->rq_export->exp_connection->c_remote_uuid.uuid,
555                                req->rq_peerstr);
556                         ptlrpc_fail_export(req->rq_export);
557                 } else {
558                         CERROR("ignoring bulk IO comms error: "
559                                "client reconnected %s@%s id %s\n",  
560                                req->rq_export->exp_client_uuid.uuid,
561                                req->rq_export->exp_connection->c_remote_uuid.uuid,
562                                req->rq_peerstr);
563                 }
564         }
565
566         RETURN(rc);
567 }
568
569 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
570 {
571         struct ptlrpc_bulk_desc *desc;
572         struct niobuf_remote    *remote_nb;
573         struct niobuf_remote    *pp_rnb;
574         struct niobuf_local     *local_nb;
575         struct obd_ioobj        *ioo;
576         struct ost_body         *body, *repbody;
577         struct l_wait_info       lwi;
578         __u32                   *rcs;
579         int                      size[2] = { sizeof(*body) };
580         int                      objcount, niocount, npages;
581         int                      comms_error = 0;
582         int                      rc, swab, i, j;
583         struct timeval           start;        
584         ENTRY;
585
586         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
587                 GOTO(out, rc = -EIO);
588
589         /* pause before transaction has been started */
590         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
591                          (obd_timeout + 1) / 4);
592
593         swab = lustre_msg_swabbed(req->rq_reqmsg);
594         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
595         if (body == NULL) {
596                 CERROR("Missing/short ost_body\n");
597                 GOTO(out, rc = -EFAULT);
598         }
599
600         LASSERT_REQSWAB(req, 1);
601         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
602         if (objcount == 0) {
603                 CERROR("Missing/short ioobj\n");
604                 GOTO(out, rc = -EFAULT);
605         }
606         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
607         LASSERT (ioo != NULL);
608         for (niocount = i = 0; i < objcount; i++) {
609                 if (swab)
610                         lustre_swab_obd_ioobj (&ioo[i]);
611                 if (ioo[i].ioo_bufcnt == 0) {
612                         CERROR("ioo[%d] has zero bufcnt\n", i);
613                         GOTO(out, rc = -EFAULT);
614                 }
615                 niocount += ioo[i].ioo_bufcnt;
616         }
617
618         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
619                                        lustre_swab_niobuf_remote);
620         if (remote_nb == NULL) {
621                 CERROR("Missing/short niobuf\n");
622                 GOTO(out, rc = -EFAULT);
623         }
624         if (swab) {                             /* swab the remaining niobufs */
625                 for (i = 1; i < niocount; i++)
626                         lustre_swab_niobuf_remote (&remote_nb[i]);
627         }
628
629         size[1] = niocount * sizeof(*rcs);
630         rc = lustre_pack_reply(req, 2, size, NULL);
631         if (rc != 0)
632                 GOTO(out, rc);
633         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
634
635 #if 0
636         /* Do snap options here*/
637         rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
638         if (rc)
639                 GOTO(out, rc);
640 #endif
641
642         /* FIXME all niobuf splitting should be done in obdfilter if needed */
643         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
644         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
645         if (npages < 0)
646                 GOTO(out, rc = npages);
647
648         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
649         if (local_nb == NULL)
650                 GOTO(out_pp_rnb, rc = -ENOMEM);
651
652         desc = ptlrpc_prep_bulk_exp (req, npages, 
653                                      BULK_GET_SINK, OST_BULK_PORTAL);
654         if (desc == NULL)
655                 GOTO(out_local, rc = -ENOMEM);
656
657         do_gettimeofday(&start);
658         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
659                         ioo, npages, pp_rnb, local_nb, oti);
660         ost_stime_record(req, &start, 1, 0);
661         if (rc != 0)
662                 GOTO(out_bulk, rc);
663
664         /* NB Having prepped, we must commit... */
665
666         for (i = 0; i < npages; i++)
667                 ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
668                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
669                                       pp_rnb[i].len);
670
671         rc = ptlrpc_start_bulk_transfer (desc);
672         if (rc == 0) {
673                 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
674                                   ost_bulk_timeout, desc);
675                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
676                                   &lwi);
677                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
678                 if (rc == -ETIMEDOUT) {
679                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
680                         ptlrpc_abort_bulk(desc);
681                 } else if (!desc->bd_success ||
682                            desc->bd_nob_transferred != desc->bd_nob) {
683                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
684                                   desc->bd_success ? 
685                                   "truncated" : "network error on",
686                                   desc->bd_nob_transferred, desc->bd_nob);
687                         /* XXX should this be a different errno? */
688                         rc = -ETIMEDOUT;
689                 }
690         } else {
691                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
692         }
693         comms_error = rc != 0;
694
695         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
696         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
697
698 #if CHECKSUM_BULK
699         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
700                 static int cksum_counter;
701                 obd_count client_cksum = body->oa.o_cksum;
702                 obd_count cksum = ost_checksum_bulk(desc);
703
704                 if (client_cksum != cksum) {
705                         CERROR("Bad checksum: client %x, server %x id %s\n",
706                                client_cksum, cksum,
707                                req->rq_peerstr);
708                         cksum_counter = 1;
709                         repbody->oa.o_cksum = cksum;
710                 } else {
711                         cksum_counter++;
712                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
713                                 CWARN("Checksum %u from NID %s: %x OK\n",         
714                                       cksum_counter, req->rq_peerstr, cksum);
715                 }
716         }
717 #endif
718         ost_stime_record(req, &start, 1, 1);
719         /* Must commit after prep above in all cases */
720         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
721                           objcount, ioo, npages, local_nb, oti, rc);
722
723         ost_stime_record(req, &start, 1, 2);
724         if (rc == 0) {
725                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
726                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
727
728 #if CHECKSUM_BULK
729                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
730                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
731 #endif
732                 /* set per-requested niobuf return codes */
733                 for (i = j = 0; i < niocount; i++) {
734                         int nob = remote_nb[i].len;
735
736                         rcs[i] = 0;
737                         do {
738                                 LASSERT(j < npages);
739                                 if (local_nb[j].rc < 0)
740                                         rcs[i] = local_nb[j].rc;
741                                 nob -= pp_rnb[j].len;
742                                 j++;
743                         } while (nob > 0);
744                         LASSERT(nob == 0);
745                 }
746                 LASSERT(j == npages);
747         }
748         /*XXX This write extents only for write-back cache extents*/
749         rc = obd_write_extents(req->rq_export, ioo, objcount, niocount, 
750                                local_nb, rc);
751  out_bulk:
752         ptlrpc_free_bulk(desc);
753  out_local:
754         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
755  out_pp_rnb:
756         free_per_page_niobufs(npages, pp_rnb, remote_nb);
757  out:
758         if (rc == 0) {
759                 oti_to_request(oti, req);
760                 rc = ptlrpc_reply(req);
761         } else if (!comms_error) {
762                 /* Only reply if there was no comms problem with bulk */
763                 req->rq_status = rc;
764                 ptlrpc_error(req);
765         } else {
766                 if (req->rq_reply_state != NULL) {
767                         /* reply out callback would free */
768                         lustre_free_reply_state (req->rq_reply_state);
769                 }
770                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
771                         CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
772                                req->rq_export->exp_obd->obd_name,
773                                req->rq_export->exp_client_uuid.uuid,
774                                req->rq_export->exp_connection->c_remote_uuid.uuid,
775                                req->rq_peerstr);
776                         ptlrpc_fail_export(req->rq_export);
777                 } else {
778                         CERROR("ignoring bulk IO comms error: "
779                                "client reconnected %s@%s id %s\n",
780                                req->rq_export->exp_client_uuid.uuid,
781                                req->rq_export->exp_connection->c_remote_uuid.uuid,
782                                req->rq_peerstr);
783                 }
784         }
785         RETURN(rc);
786 }
787 EXPORT_SYMBOL(ost_brw_write);
788
789 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
790 {
791         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
792         struct obd_ioobj *ioo;
793         struct ost_body *body, *repbody;
794         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
795         int swab;
796         ENTRY;
797
798         /* XXX not set to use latest protocol */
799
800         swab = lustre_msg_swabbed(req->rq_reqmsg);
801         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
802         if (body == NULL) {
803                 CERROR("Missing/short ost_body\n");
804                 GOTO(out, rc = -EFAULT);
805         }
806
807         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
808         if (ioo == NULL) {
809                 CERROR("Missing/short ioobj\n");
810                 GOTO(out, rc = -EFAULT);
811         }
812         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
813         niocount = ioo[0].ioo_bufcnt;
814         for (i = 1; i < objcount; i++) {
815                 if (swab)
816                         lustre_swab_obd_ioobj (&ioo[i]);
817                 niocount += ioo[i].ioo_bufcnt;
818         }
819
820         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
821                                        lustre_swab_niobuf_remote);
822         if (remote_nb == NULL) {
823                 CERROR("Missing/short niobuf\n");
824                 GOTO(out, rc = -EFAULT);
825         }
826         if (swab) {                             /* swab the remaining niobufs */
827                 for (i = 1; i < niocount; i++)
828                         lustre_swab_niobuf_remote (&remote_nb[i]);
829         }
830
831         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
832         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
833         if (npages < 0)
834                 GOTO (out, rc = npages);
835  
836         size[1] = npages * sizeof(*pp_rnb);
837         rc = lustre_pack_reply(req, 2, size, NULL);
838         if (rc)
839                 GOTO(out_pp_rnb, rc);
840
841         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
842                                         objcount, ioo, npages, pp_rnb);
843
844         if (req->rq_status)
845                 GOTO(out_pp_rnb, rc = 0);
846
847         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
848         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
849
850         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
851         memcpy(res_nb, remote_nb, size[1]);
852         rc = 0;
853 out_pp_rnb:
854         free_per_page_niobufs(npages, pp_rnb, remote_nb);
855 out:
856         if (rc) {
857                 req->rq_status = rc;
858                 ptlrpc_error(req);
859         } else
860                 ptlrpc_reply(req);
861
862         return rc;
863 }
864
865 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
866 {
867         char *key, *val;
868         int keylen, rc = 0;
869         ENTRY;
870
871         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
872         if (key == NULL) {
873                 DEBUG_REQ(D_HA, req, "no set_info key");
874                 RETURN(-EFAULT);
875         }
876         keylen = req->rq_reqmsg->buflens[0];
877
878         rc = lustre_pack_reply(req, 0, NULL, NULL);
879         if (rc)
880                 RETURN(rc);
881
882         val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
883
884         rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
885         req->rq_repmsg->status = 0;
886         RETURN(rc);
887 }
888
889 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
890 {
891         char *key;
892         int keylen, rc = 0, size = sizeof(obd_id);
893         obd_id *reply;
894         ENTRY;
895
896         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
897         if (key == NULL) {
898                 DEBUG_REQ(D_HA, req, "no get_info key");
899                 RETURN(-EFAULT);
900         }
901         keylen = req->rq_reqmsg->buflens[0];
902
903         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
904                 RETURN(-EPROTO);
905
906         rc = lustre_pack_reply(req, 1, &size, NULL);
907         if (rc)
908                 RETURN(rc);
909
910         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
911         rc = obd_get_info(exp, keylen, key, &size, reply);
912         req->rq_repmsg->status = 0;
913         RETURN(rc);
914 }
915
916 static int ost_llog_handle_connect(struct obd_export *exp,
917                                    struct ptlrpc_request *req)
918 {
919         struct llogd_conn_body *body;
920         int rc;
921         ENTRY;
922
923         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
924         rc = obd_llog_connect(exp, body);
925         RETURN(rc);
926 }
927
928 static int ost_filter_recovery_request(struct ptlrpc_request *req,
929                                        struct obd_device *obd, int *process)
930 {
931         switch (req->rq_reqmsg->opc) {
932         case OST_CONNECT: /* This will never get here, but for completeness. */
933         case OST_DISCONNECT:
934                *process = 1;
935                RETURN(0);
936
937         case OBD_PING:
938         case OST_CREATE:
939         case OST_DESTROY:
940         case OST_PUNCH:
941         case OST_SETATTR:
942         case OST_SYNC:
943         case OST_WRITE:
944         case OBD_LOG_CANCEL:
945         case LDLM_ENQUEUE:
946                 *process = target_queue_recovery_request(req, obd);
947                 RETURN(0);
948
949         default:
950                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
951                 *process = 0;
952                 /* XXX what should we set rq_status to here? */
953                 req->rq_status = -EAGAIN;
954                 RETURN(ptlrpc_error(req));
955         }
956 }
957
958 int ost_msg_check_version(struct lustre_msg *msg)
959 {
960         int rc;
961
962         switch(msg->opc) {
963         case OST_CONNECT:
964         case OST_DISCONNECT:
965         case OBD_PING:
966         case OST_CREATE:
967         case OST_DESTROY:
968         case OST_GETATTR:
969         case OST_SETATTR:
970         case OST_WRITE:
971         case OST_READ:
972         case OST_SAN_READ:
973         case OST_SAN_WRITE:
974         case OST_PUNCH:
975         case OST_STATFS:
976         case OST_SYNC:
977         case OST_SET_INFO:
978         case OST_GET_INFO:
979                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
980                 if (rc)
981                         CERROR("bad opc %u version %08x, expecting %08x\n",
982                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
983                 break;
984         case LDLM_ENQUEUE:
985         case LDLM_CONVERT:
986         case LDLM_CANCEL:
987         case LDLM_BL_CALLBACK:
988         case LDLM_CP_CALLBACK:
989                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
990                 if (rc)
991                         CERROR("bad opc %u version %08x, expecting %08x\n",
992                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
993                 break;
994         case OBD_LOG_CANCEL:
995         case LLOG_ORIGIN_CONNECT:
996                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
997                 if (rc)
998                         CERROR("bad opc %u version %08x, expecting %08x\n",
999                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1000                 break;
1001         case SEC_INIT:
1002         case SEC_INIT_CONTINUE:
1003         case SEC_FINI:
1004                 rc = 0;
1005                 break;
1006         default:
1007                 CERROR("OST unexpected opcode %d\n", msg->opc);
1008                 rc = -ENOTSUPP;
1009                 break;
1010         }
1011         return rc;
1012 }
1013
1014 int ost_handle(struct ptlrpc_request *req)
1015 {
1016         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1017         struct obd_trans_info *oti = NULL;
1018         struct obd_device *obd = NULL;
1019         ENTRY;
1020
1021         LASSERT(current->journal_info == NULL);
1022
1023         rc = ost_msg_check_version(req->rq_reqmsg);
1024         if (rc) {
1025                 CERROR("OST drop mal-formed request\n");
1026                 RETURN(rc);
1027         }
1028
1029         /* Security opc should NOT trigger any recovery events */
1030         if (req->rq_reqmsg->opc == SEC_INIT ||
1031             req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
1032             req->rq_reqmsg->opc == SEC_FINI) {
1033                 RETURN(0);
1034         }
1035
1036         /* XXX identical to MDS */
1037         if (req->rq_reqmsg->opc != OST_CONNECT) {
1038                 int recovering;
1039
1040                 if (req->rq_export == NULL) {
1041                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1042                                req->rq_reqmsg->opc,
1043                                req->rq_peerstr);
1044                         req->rq_status = -ENOTCONN;
1045                         GOTO(out_check_req, rc = -ENOTCONN);
1046                 }
1047
1048                 obd = req->rq_export->exp_obd;
1049
1050                 /* Check for aborted recovery. */
1051                 spin_lock_bh(&obd->obd_processing_task_lock);
1052                 recovering = obd->obd_recovering;
1053                 spin_unlock_bh(&obd->obd_processing_task_lock);
1054                 if (recovering) {
1055                         rc = ost_filter_recovery_request(req, obd,
1056                                                          &should_process);
1057                         if (rc || !should_process)
1058                                 RETURN(rc);
1059                         if (should_process < 0) {
1060                                 req->rq_status = should_process;
1061                                 rc = ptlrpc_error(req);
1062                                 RETURN(rc);
1063                         }
1064                 }
1065         }
1066
1067         OBD_ALLOC(oti, sizeof(*oti));
1068         if (oti == NULL)
1069                 RETURN(-ENOMEM);
1070                 
1071         oti_init(oti, req);
1072
1073         switch (req->rq_reqmsg->opc) {
1074         case OST_CONNECT: {
1075                 CDEBUG(D_INODE, "connect\n");
1076                 OBD_FAIL_GOTO(OBD_FAIL_OST_CONNECT_NET, out_free_oti, rc = 0);
1077                 rc = target_handle_connect(req);
1078                 if (!rc)
1079                         obd = req->rq_export->exp_obd;
1080                 break;
1081         }
1082         case OST_DISCONNECT:
1083                 CDEBUG(D_INODE, "disconnect\n");
1084                 OBD_FAIL_GOTO(OBD_FAIL_OST_DISCONNECT_NET, out_free_oti, rc = 0);
1085                 rc = target_handle_disconnect(req);
1086                 break;
1087         case OST_CREATE:
1088                 CDEBUG(D_INODE, "create\n");
1089                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1090                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1091                 OBD_FAIL_GOTO(OBD_FAIL_OST_CREATE_NET, out_free_oti, rc = 0);
1092                 rc = ost_create(req->rq_export, req, oti);
1093                 break;
1094         case OST_DESTROY:
1095                 CDEBUG(D_INODE, "destroy\n");
1096                 OBD_FAIL_GOTO(OBD_FAIL_OST_DESTROY_NET, out_free_oti, rc = 0);
1097                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1098                 rc = ost_destroy(req->rq_export, req, oti);
1099                 break;
1100         case OST_GETATTR:
1101                 CDEBUG(D_INODE, "getattr\n");
1102                 OBD_FAIL_GOTO(OBD_FAIL_OST_GETATTR_NET, out_free_oti, rc = 0);
1103                 rc = ost_getattr(req->rq_export, req);
1104                 break;
1105         case OST_SETATTR:
1106                 CDEBUG(D_INODE, "setattr\n");
1107                 OBD_FAIL_GOTO(OBD_FAIL_OST_SETATTR_NET, out_free_oti, rc = 0);
1108                 rc = ost_setattr(req->rq_export, req, oti);
1109                 break;
1110         case OST_WRITE:
1111                 CDEBUG(D_INODE, "write\n");
1112                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1113                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1114                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1115                 rc = ost_brw_write(req, oti);
1116                 LASSERT(current->journal_info == NULL);
1117                 /* ost_brw sends its own replies */
1118                 GOTO(out_free_oti, rc);
1119         case OST_READ:
1120                 CDEBUG(D_INODE, "read\n");
1121                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1122                 rc = ost_brw_read(req);
1123                 LASSERT(current->journal_info == NULL);
1124                 /* ost_brw sends its own replies */
1125                 GOTO(out_free_oti, rc);
1126         case OST_SAN_READ:
1127                 CDEBUG(D_INODE, "san read\n");
1128                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1129                 rc = ost_san_brw(req, OBD_BRW_READ);
1130                 /* ost_san_brw sends its own replies */
1131                 GOTO(out_free_oti, rc);
1132         case OST_SAN_WRITE:
1133                 CDEBUG(D_INODE, "san write\n");
1134                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1135                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1136                 /* ost_san_brw sends its own replies */
1137                 GOTO(out_free_oti, rc);
1138         case OST_PUNCH:
1139                 CDEBUG(D_INODE, "punch\n");
1140                 OBD_FAIL_GOTO(OBD_FAIL_OST_PUNCH_NET, out_free_oti, rc = 0);
1141                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1142                 rc = ost_punch(req->rq_export, req, oti);
1143                 break;
1144         case OST_STATFS:
1145                 CDEBUG(D_INODE, "statfs\n");
1146                 OBD_FAIL_GOTO(OBD_FAIL_OST_STATFS_NET, out_free_oti, rc = 0);
1147                 rc = ost_statfs(req);
1148                 break;
1149         case OST_SYNC:
1150                 CDEBUG(D_INODE, "sync\n");
1151                 OBD_FAIL_GOTO(OBD_FAIL_OST_SYNC_NET, out_free_oti, rc = 0);
1152                 rc = ost_sync(req->rq_export, req);
1153                 break;
1154         case OST_SET_INFO:
1155                 DEBUG_REQ(D_INODE, req, "set_info");
1156                 rc = ost_set_info(req->rq_export, req);
1157                 break;
1158         case OST_GET_INFO:
1159                 DEBUG_REQ(D_INODE, req, "get_info");
1160                 rc = ost_get_info(req->rq_export, req);
1161                 break;
1162         case OBD_PING:
1163                 DEBUG_REQ(D_INODE, req, "ping");
1164                 rc = target_handle_ping(req);
1165                 break;
1166         /* FIXME - just reply status */
1167         case LLOG_ORIGIN_CONNECT:
1168                 DEBUG_REQ(D_INODE, req, "log connect\n");
1169                 rc = ost_llog_handle_connect(req->rq_export, req); 
1170                 req->rq_status = rc;
1171                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1172                 if (rc)
1173                         GOTO(out_free_oti, rc);
1174                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1175         case OBD_LOG_CANCEL:
1176                 CDEBUG(D_INODE, "log cancel\n");
1177                 OBD_FAIL_GOTO(OBD_FAIL_OBD_LOG_CANCEL_NET, out_free_oti, rc = 0);
1178                 rc = llog_origin_handle_cancel(req);
1179                 req->rq_status = rc;
1180                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1181                 if (rc)
1182                         GOTO(out_free_oti, rc);
1183                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1184         case LDLM_ENQUEUE:
1185                 CDEBUG(D_INODE, "enqueue\n");
1186                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_ENQUEUE, out_free_oti, rc = 0);
1187                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1188                                          ldlm_server_blocking_ast,
1189                                          ldlm_server_glimpse_ast);
1190                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1191                 break;
1192         case LDLM_CONVERT:
1193                 CDEBUG(D_INODE, "convert\n");
1194                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CONVERT, out_free_oti, rc = 0);
1195                 rc = ldlm_handle_convert(req);
1196                 break;
1197         case LDLM_CANCEL:
1198                 CDEBUG(D_INODE, "cancel\n");
1199                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CANCEL, out_free_oti, rc = 0);
1200                 rc = ldlm_handle_cancel(req);
1201                 break;
1202         case LDLM_BL_CALLBACK:
1203         case LDLM_CP_CALLBACK:
1204                 CDEBUG(D_INODE, "callback\n");
1205                 CERROR("callbacks should not happen on OST\n");
1206                 /* fall through */
1207         default:
1208                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1209                 req->rq_status = -ENOTSUPP;
1210                 rc = ptlrpc_error(req);
1211                 GOTO(out_free_oti, rc);
1212         }
1213
1214         LASSERT(current->journal_info == NULL);
1215
1216         EXIT;
1217         /* If we're DISCONNECTing, the export_data is already freed */
1218         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1219                 if (!obd->obd_no_transno) {
1220                         req->rq_repmsg->last_committed =
1221                                 obd->obd_last_committed;
1222                 } else {
1223                         DEBUG_REQ(D_IOCTL, req,
1224                                   "not sending last_committed update");
1225                 }
1226                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1227                        obd->obd_last_committed, req->rq_xid);
1228         }
1229
1230 out_check_req:
1231         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1232                 if (obd && obd->obd_recovering) {
1233                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1234                         rc = target_queue_final_reply(req, rc);
1235                         GOTO(out_free_oti, rc);
1236                 }
1237                 /* Lost a race with recovery; let the error path DTRT. */
1238                 rc = req->rq_status = -ENOTCONN;
1239         }
1240
1241         if (!rc)
1242                 oti_to_request(oti, req);
1243         target_send_reply(req, rc, fail);
1244         rc = 0;
1245         
1246 out_free_oti:
1247         if (oti)
1248                 OBD_FREE(oti, sizeof(*oti));
1249         return rc;
1250 }
1251 EXPORT_SYMBOL(ost_handle);
1252
1253 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1254 {
1255         struct lprocfs_static_vars lvars;
1256
1257         lprocfs_init_vars(ost,&lvars);
1258         return lprocfs_obd_attach(dev, lvars.obd_vars);
1259 }
1260
1261 int ost_detach(struct obd_device *dev)
1262 {
1263         return lprocfs_obd_detach(dev);
1264 }
1265
1266 extern struct file_operations ost_stimes_fops;
1267
1268 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1269 {
1270         struct ost_obd *ost = &obd->u.ost;
1271         int rc;
1272         ENTRY;
1273
1274         rc = cleanup_group_info();
1275         if (rc)
1276                 RETURN(rc);
1277
1278         rc = llog_start_commit_thread();
1279         if (rc < 0)
1280                 RETURN(rc);
1281
1282         lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1283                                obd);
1284
1285         ost->ost_service =
1286                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1287                                 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 30000,
1288                                 ost_handle, "ost",
1289                                 obd->obd_proc_entry);
1290         if (ost->ost_service == NULL) {
1291                 CERROR("failed to start service\n");
1292                 RETURN(-ENOMEM);
1293         }
1294
1295         rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1296                                     "ll_ost");
1297         if (rc)
1298                 GOTO(out_service, rc = -EINVAL);
1299
1300         ost->ost_create_service =
1301                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1302                                 OST_CREATE_PORTAL, OSC_REPLY_PORTAL, 30000,
1303                                 ost_handle, "ost_create",
1304                                 obd->obd_proc_entry);
1305         if (ost->ost_create_service == NULL) {
1306                 CERROR("failed to start OST create service\n");
1307                 GOTO(out_service, rc = -ENOMEM);
1308         }
1309
1310
1311         spin_lock_init(&ost->ost_lock);
1312         ost->ost_service->srv_obddev = obd;
1313         
1314         rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1315                                     "ll_ost_creat");
1316         if (rc)
1317                 GOTO(out_create, rc = -EINVAL);
1318
1319         RETURN(0);
1320
1321 out_create:
1322         ptlrpc_unregister_service(ost->ost_create_service);
1323 out_service:
1324         ptlrpc_unregister_service(ost->ost_service);
1325         RETURN(rc);
1326 }
1327
1328 static int ost_cleanup(struct obd_device *obd, int flags)
1329 {
1330         struct ost_obd *ost = &obd->u.ost;
1331         int err = 0;
1332         ENTRY;
1333
1334         spin_lock_bh(&obd->obd_processing_task_lock);
1335         if (obd->obd_recovering) {
1336                 target_cancel_recovery_timer(obd);
1337                 obd->obd_recovering = 0;
1338         }
1339         spin_unlock_bh(&obd->obd_processing_task_lock);
1340
1341         ptlrpc_stop_all_threads(ost->ost_service);
1342         ptlrpc_unregister_service(ost->ost_service);
1343
1344         ptlrpc_stop_all_threads(ost->ost_create_service);
1345         ptlrpc_unregister_service(ost->ost_create_service);
1346
1347         RETURN(err);
1348 }
1349
1350 /* use obd ops to offer management infrastructure */
1351 static struct obd_ops ost_obd_ops = {
1352         .o_owner        = THIS_MODULE,
1353         .o_attach       = ost_attach,
1354         .o_detach       = ost_detach,
1355         .o_setup        = ost_setup,
1356         .o_cleanup      = ost_cleanup,
1357 };
1358
1359 static int __init ost_init(void)
1360 {
1361         struct lprocfs_static_vars lvars;
1362         ENTRY;
1363
1364         lprocfs_init_vars(ost,&lvars);
1365         RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1366                                    LUSTRE_OST_NAME));
1367 }
1368
1369 static void /*__exit*/ ost_exit(void)
1370 {
1371         class_unregister_type(LUSTRE_OST_NAME);
1372 }
1373
1374 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1375 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1376 MODULE_LICENSE("GPL");
1377
1378 module_init(ost_init);
1379 module_exit(ost_exit);