Whamcloud - gitweb
- cleanups in cmobd and others:
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_OST
37
38 #include <linux/module.h>
39 #include <linux/obd_ost.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_export.h>
43 #include <linux/init.h>
44 #include <linux/lprocfs_status.h>
45 #include <linux/lustre_commit_confd.h>
46 #include <libcfs/list.h>
47 #include <linux/lustre_sec.h>
48
49 void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
50 {
51         if (oti == NULL)
52                 return;
53         memset(oti, 0, sizeof *oti);
54
55         if (req->rq_repmsg && req->rq_reqmsg != 0)
56                 oti->oti_transno = req->rq_repmsg->transno;
57 }
58
59 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
60 {
61         struct oti_req_ack_lock *ack_lock;
62         int i;
63
64         if (oti == NULL)
65                 return;
66
67         if (req->rq_repmsg)
68                 req->rq_repmsg->transno = oti->oti_transno;
69
70         /* XXX 4 == entries in oti_ack_locks??? */
71         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
72                 if (!ack_lock->mode)
73                         break;
74                 /* XXX not even calling target_send_reply in some cases... */
75                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
76         }
77 }
78
79 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
80                        struct obd_trans_info *oti)
81 {
82         struct ost_body *body, *repbody;
83         int rc, size = sizeof(*body);
84         ENTRY;
85
86         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
87         if (body == NULL)
88                 RETURN(-EFAULT);
89
90         rc = lustre_pack_reply(req, 1, &size, NULL);
91         if (rc)
92                 RETURN(rc);
93
94         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
95                 oti->oti_logcookies = obdo_logcookie(&body->oa);
96         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
97         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
98         req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
99         RETURN(0);
100 }
101
102 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
103 {
104         struct ost_body *body, *repbody;
105         int rc, size = sizeof(*body);
106         ENTRY;
107
108         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
109         if (body == NULL)
110                 RETURN(-EFAULT);
111
112         rc = lustre_pack_reply(req, 1, &size, NULL);
113         if (rc)
114                 RETURN(rc);
115
116         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
117         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
118         req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
119         RETURN(0);
120 }
121
122 static int ost_statfs(struct ptlrpc_request *req)
123 {
124         struct obd_statfs *osfs;
125         int rc, size = sizeof(*osfs);
126         ENTRY;
127
128         rc = lustre_pack_reply(req, 1, &size, NULL);
129         if (rc)
130                 RETURN(rc);
131
132         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
133
134         req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
135         if (req->rq_status != 0)
136                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
137
138         RETURN(0);
139 }
140
141 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
142                       struct obd_trans_info *oti)
143 {
144         struct ost_body *body, *repbody;
145         int rc, size = sizeof(*repbody);
146         ENTRY;
147
148         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
149         if (body == NULL)
150                 RETURN(-EFAULT);
151
152         rc = lustre_pack_reply(req, 1, &size, NULL);
153         if (rc)
154                 RETURN(rc);
155
156         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
157         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
158         oti->oti_logcookies = obdo_logcookie(&repbody->oa);
159         req->rq_status = obd_create(exp, &repbody->oa, NULL, 0, NULL, oti);
160         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
161         RETURN(0);
162 }
163
164 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
165                      struct obd_trans_info *oti)
166 {
167         struct ost_body *body, *repbody;
168         int rc, size = sizeof(*repbody);
169         ENTRY;
170
171         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
172         if (body == NULL)
173                 RETURN(-EFAULT);
174
175         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
176             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
177                 RETURN(-EINVAL);
178
179         rc = lustre_pack_reply(req, 1, &size, NULL);
180         if (rc)
181                 RETURN(rc);
182
183         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
184         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
185         req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
186                                    repbody->oa.o_blocks, oti);
187         RETURN(0);
188 }
189
190 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
191 {
192         struct ost_body *body, *repbody;
193         int rc, size = sizeof(*repbody);
194         ENTRY;
195
196         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
197         if (body == NULL)
198                 RETURN(-EFAULT);
199
200         rc = lustre_pack_reply(req, 1, &size, NULL);
201         if (rc)
202                 RETURN(rc);
203
204         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
205         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
206         req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
207                                   repbody->oa.o_blocks);
208         RETURN(0);
209 }
210
211 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
212                        struct obd_trans_info *oti)
213 {
214         struct ost_body *body, *repbody;
215         int rc, size = sizeof(*repbody);
216         ENTRY;
217
218         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
219         if (body == NULL)
220                 RETURN(-EFAULT);
221
222         rc = lustre_pack_reply(req, 1, &size, NULL);
223         if (rc)
224                 RETURN(rc);
225
226         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
227         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
228
229         req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
230         RETURN(0);
231 }
232
233 static int ost_bulk_timeout(void *data)
234 {
235         ENTRY;
236         /* We don't fail the connection here, because having the export
237          * killed makes the (vital) call to commitrw very sad.
238          */
239         RETURN(1);
240 }
241
242 static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
243                                 struct niobuf_remote *rnb, int nrnb,
244                                 struct niobuf_remote **pp_rnbp)
245 {
246         /* Copy a remote niobuf, splitting it into page-sized chunks
247          * and setting ioo[i].ioo_bufcnt accordingly */
248         struct niobuf_remote *pp_rnb;
249         int   i;
250         int   j;
251         int   page;
252         int   rnbidx = 0;
253         int   npages = 0;
254
255         /* first count and check the number of pages required */
256         for (i = 0; i < nioo; i++)
257                 for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
258                         obd_off offset = rnb[rnbidx].offset;
259                         obd_off p0 = offset >> PAGE_SHIFT;
260                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
261
262                         LASSERT(rnbidx < nrnb);
263
264                         npages += (pn + 1 - p0);
265
266                         if (rnb[rnbidx].len == 0) {
267                                 CERROR("zero len BRW: obj %d objid "LPX64
268                                        " buf %u\n", i, ioo[i].ioo_id, j);
269                                 return -EINVAL;
270                         }
271                         if (j > 0 &&
272                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
273                                 CERROR("unordered BRW: obj %d objid "LPX64
274                                        " buf %u offset "LPX64" <= "LPX64"\n",
275                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
276                                        rnb[rnbidx].offset);
277                                 return -EINVAL;
278                         }
279                 }
280
281         LASSERT(rnbidx == nrnb);
282
283         if (npages == nrnb) {       /* all niobufs are for single pages */
284                 *pp_rnbp = rnb;
285                 return npages;
286         }
287
288         OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
289         if (pp_rnb == NULL)
290                 return -ENOMEM;
291
292         /* now do the actual split */
293         page = rnbidx = 0;
294         for (i = 0; i < nioo; i++) {
295                 int  obj_pages = 0;
296
297                 for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
298                         obd_off off = rnb[rnbidx].offset;
299                         int     nob = rnb[rnbidx].len;
300
301                         LASSERT(rnbidx < nrnb);
302                         do {
303                                 obd_off  poff = off & (PAGE_SIZE - 1);
304                                 int      pnob = (poff + nob > PAGE_SIZE) ?
305                                                 PAGE_SIZE - poff : nob;
306
307                                 LASSERT(page < npages);
308                                 pp_rnb[page].len = pnob;
309                                 pp_rnb[page].offset = off;
310                                 pp_rnb[page].flags = rnb[rnbidx].flags;
311
312                                 CDEBUG(0, "   obj %d id "LPX64
313                                        "page %d(%d) "LPX64" for %d, flg %x\n",
314                                        i, ioo[i].ioo_id, obj_pages, page,
315                                        pp_rnb[page].offset, pp_rnb[page].len,
316                                        pp_rnb[page].flags);
317                                 page++;
318                                 obj_pages++;
319
320                                 off += pnob;
321                                 nob -= pnob;
322                         } while (nob > 0);
323                         LASSERT(nob == 0);
324                 }
325                 ioo[i].ioo_bufcnt = obj_pages;
326         }
327         LASSERT(page == npages);
328
329         *pp_rnbp = pp_rnb;
330         return npages;
331 }
332
333 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
334                                    struct niobuf_remote *rnb)
335 {
336         if (pp_rnb == rnb)                      /* didn't allocate above */
337                 return;
338
339         OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
340 }
341
342 #if CHECKSUM_BULK
343 obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
344 {
345         obd_count cksum = 0;
346         struct ptlrpc_bulk_page *bp;
347
348         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
349                 ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
350                              bp->bp_buflen);
351                 kunmap(bp->bp_page);
352         }
353
354         return cksum;
355 }
356 #endif
357
358 static void ost_stime_record(struct ptlrpc_request *req, struct timeval *start,
359                              unsigned rw, unsigned phase)
360 {
361         struct obd_device *obd = req->rq_svc->srv_obddev;
362         struct timeval stop;
363         int ind = rw *3 + phase;
364          
365         if (obd && obd->obd_type && obd->obd_type->typ_name) {
366                 if (!strcmp(obd->obd_type->typ_name, OBD_OST_DEVICENAME)) {
367                         struct ost_obd *ost = NULL;
368                         
369                         ost = &obd->u.ost;
370                         if (ind >= (sizeof(ost->ost_stimes) / 
371                                     sizeof(ost->ost_stimes[0])))
372                                return;
373                         do_gettimeofday(&stop);
374
375                         spin_lock(&ost->ost_lock);
376                         lprocfs_stime_record(&ost->ost_stimes[ind],&stop,start);
377                         spin_unlock(&ost->ost_lock);
378                         memcpy(start, &stop, sizeof(*start));
379                 }
380        } 
381 }
382
383 static int ost_brw_read(struct ptlrpc_request *req)
384 {
385         struct ptlrpc_bulk_desc *desc;
386         struct niobuf_remote    *remote_nb;
387         struct niobuf_remote    *pp_rnb;
388         struct niobuf_local     *local_nb;
389         struct obd_ioobj        *ioo;
390         struct ost_body         *body, *repbody;
391         struct l_wait_info       lwi;
392         struct obd_trans_info    oti = { 0 };
393         int                      size[1] = { sizeof(*body) };
394         int                      comms_error = 0;
395         int                      niocount;
396         int                      npages;
397         int                      nob = 0;
398         int                      rc;
399         int                      i;
400         struct timeval           start;
401         ENTRY;
402
403         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
404                 GOTO(out, rc = -EIO);
405
406         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
407                          (obd_timeout + 1) / 4);
408
409         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
410         if (body == NULL) {
411                 CERROR("Missing/short ost_body\n");
412                 GOTO(out, rc = -EFAULT);
413         }
414
415         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
416         if (ioo == NULL) {
417                 CERROR("Missing/short ioobj\n");
418                 GOTO(out, rc = -EFAULT);
419         }
420
421         niocount = ioo->ioo_bufcnt;
422         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
423                                        lustre_swab_niobuf_remote);
424         if (remote_nb == NULL) {
425                 CERROR("Missing/short niobuf\n");
426                 GOTO(out, rc = -EFAULT);
427         }
428         if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
429                 for (i = 1; i < niocount; i++)
430                         lustre_swab_niobuf_remote (&remote_nb[i]);
431         }
432
433         rc = lustre_pack_reply(req, 1, size, NULL);
434         if (rc)
435                 GOTO(out, rc);
436
437         /* FIXME all niobuf splitting should be done in obdfilter if needed */
438         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
439         npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
440         if (npages < 0)
441                 GOTO(out, rc = npages);
442
443         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
444         if (local_nb == NULL)
445                 GOTO(out_pp_rnb, rc = -ENOMEM);
446
447         desc = ptlrpc_prep_bulk_exp (req, npages, 
448                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
449         if (desc == NULL)
450                 GOTO(out_local, rc = -ENOMEM);
451
452         do_gettimeofday(&start);
453         rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
454                         ioo, npages, pp_rnb, local_nb, &oti);
455         ost_stime_record(req, &start, 0, 0);
456         if (rc != 0)
457                 GOTO(out_bulk, rc);
458
459         /* We're finishing using body->oa as an input variable */
460         body->oa.o_valid = 0;
461
462         nob = 0;
463         for (i = 0; i < npages; i++) {
464                 int page_rc = local_nb[i].rc;
465
466                 if (page_rc < 0) {              /* error */
467                         rc = page_rc;
468                         break;
469                 }
470
471                 LASSERT(page_rc <= pp_rnb[i].len);
472                 nob += page_rc;
473                 if (page_rc != 0) {             /* some data! */
474                         LASSERT (local_nb[i].page != NULL);
475                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
476                                               pp_rnb[i].offset & (PAGE_SIZE-1),
477                                               page_rc);
478                 }
479
480                 if (page_rc != pp_rnb[i].len) { /* short read */
481                         /* All subsequent pages should be 0 */
482                         while(++i < npages)
483                                 LASSERT(local_nb[i].rc == 0);
484                         break;
485                 }
486         }
487
488         if (rc == 0) {
489                 rc = ptlrpc_start_bulk_transfer(desc);
490                 if (rc == 0) {
491                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
492                                           ost_bulk_timeout, desc);
493                         rc = l_wait_event(desc->bd_waitq,
494                                           !ptlrpc_bulk_active(desc), &lwi);
495                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
496                         if (rc == -ETIMEDOUT) {
497                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
498                                 ptlrpc_abort_bulk(desc);
499                         } else if (!desc->bd_success ||
500                                    desc->bd_nob_transferred != desc->bd_nob) {
501                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
502                                           desc->bd_success ?
503                                           "truncated" : "network error on",
504                                           desc->bd_nob_transferred,
505                                           desc->bd_nob);
506                                 /* XXX should this be a different errno? */
507                                 rc = -ETIMEDOUT;
508                         }
509                 } else {
510                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
511                 }
512                 comms_error = rc != 0;
513         }
514
515         ost_stime_record(req, &start, 0, 1);
516         /* Must commit after prep above in all cases */
517         rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
518                           ioo, npages, local_nb, &oti, rc);
519         ost_stime_record(req, &start, 0, 2);
520
521         if (rc == 0) {
522                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
523                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
524
525 #if CHECKSUM_BULK
526                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
527                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
528 #endif
529         }
530
531  out_bulk:
532         ptlrpc_free_bulk(desc);
533  out_local:
534         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
535  out_pp_rnb:
536         free_per_page_niobufs(npages, pp_rnb, remote_nb);
537  out:
538         LASSERT(rc <= 0);
539         if (rc == 0) {
540                 req->rq_status = nob;
541                 ptlrpc_reply(req);
542         } else if (!comms_error) {
543                 /* only reply if comms OK */
544                 req->rq_status = rc;
545                 ptlrpc_error(req);
546         } else {
547                 if (req->rq_reply_state != NULL) {
548                         /* reply out callback would free */
549                         lustre_free_reply_state (req->rq_reply_state);
550                 }
551                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
552                         CERROR("bulk IO comms error: "
553                                "evicting %s@%s id %s\n",
554                                req->rq_export->exp_client_uuid.uuid,
555                                req->rq_export->exp_connection->c_remote_uuid.uuid,
556                                req->rq_peerstr);
557                         ptlrpc_fail_export(req->rq_export);
558                 } else {
559                         CERROR("ignoring bulk IO comms error: "
560                                "client reconnected %s@%s id %s\n",  
561                                req->rq_export->exp_client_uuid.uuid,
562                                req->rq_export->exp_connection->c_remote_uuid.uuid,
563                                req->rq_peerstr);
564                 }
565         }
566
567         RETURN(rc);
568 }
569
570 int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
571 {
572         struct ptlrpc_bulk_desc *desc;
573         struct niobuf_remote    *remote_nb;
574         struct niobuf_remote    *pp_rnb;
575         struct niobuf_local     *local_nb;
576         struct obd_ioobj        *ioo;
577         struct ost_body         *body, *repbody;
578         struct l_wait_info       lwi;
579         __u32                   *rcs;
580         int                      size[2] = { sizeof(*body) };
581         int                      objcount, niocount, npages;
582         int                      comms_error = 0;
583         int                      rc, swab, i, j;
584         struct timeval           start;        
585         ENTRY;
586
587         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
588                 GOTO(out, rc = -EIO);
589
590         /* pause before transaction has been started */
591         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
592                          (obd_timeout + 1) / 4);
593
594         swab = lustre_msg_swabbed(req->rq_reqmsg);
595         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
596         if (body == NULL) {
597                 CERROR("Missing/short ost_body\n");
598                 GOTO(out, rc = -EFAULT);
599         }
600
601         LASSERT_REQSWAB(req, 1);
602         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
603         if (objcount == 0) {
604                 CERROR("Missing/short ioobj\n");
605                 GOTO(out, rc = -EFAULT);
606         }
607         ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
608         LASSERT (ioo != NULL);
609         for (niocount = i = 0; i < objcount; i++) {
610                 if (swab)
611                         lustre_swab_obd_ioobj (&ioo[i]);
612                 if (ioo[i].ioo_bufcnt == 0) {
613                         CERROR("ioo[%d] has zero bufcnt\n", i);
614                         GOTO(out, rc = -EFAULT);
615                 }
616                 niocount += ioo[i].ioo_bufcnt;
617         }
618
619         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
620                                        lustre_swab_niobuf_remote);
621         if (remote_nb == NULL) {
622                 CERROR("Missing/short niobuf\n");
623                 GOTO(out, rc = -EFAULT);
624         }
625         if (swab) {                             /* swab the remaining niobufs */
626                 for (i = 1; i < niocount; i++)
627                         lustre_swab_niobuf_remote (&remote_nb[i]);
628         }
629
630         size[1] = niocount * sizeof(*rcs);
631         rc = lustre_pack_reply(req, 2, size, NULL);
632         if (rc != 0)
633                 GOTO(out, rc);
634         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
635
636 #if 0
637         /* Do snap options here*/
638         rc = obd_do_cow(req->rq_export, ioo, objcount, remote_nb);
639         if (rc)
640                 GOTO(out, rc);
641 #endif
642
643         /* FIXME all niobuf splitting should be done in obdfilter if needed */
644         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
645         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
646         if (npages < 0)
647                 GOTO(out, rc = npages);
648
649         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
650         if (local_nb == NULL)
651                 GOTO(out_pp_rnb, rc = -ENOMEM);
652
653         desc = ptlrpc_prep_bulk_exp (req, npages, 
654                                      BULK_GET_SINK, OST_BULK_PORTAL);
655         if (desc == NULL)
656                 GOTO(out_local, rc = -ENOMEM);
657
658         do_gettimeofday(&start);
659         rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
660                         ioo, npages, pp_rnb, local_nb, oti);
661         ost_stime_record(req, &start, 1, 0);
662         if (rc != 0)
663                 GOTO(out_bulk, rc);
664
665         /* NB Having prepped, we must commit... */
666
667         for (i = 0; i < npages; i++)
668                 ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
669                                       pp_rnb[i].offset & (PAGE_SIZE - 1),
670                                       pp_rnb[i].len);
671
672         rc = ptlrpc_start_bulk_transfer (desc);
673         if (rc == 0) {
674                 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
675                                   ost_bulk_timeout, desc);
676                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
677                                   &lwi);
678                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
679                 if (rc == -ETIMEDOUT) {
680                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
681                         ptlrpc_abort_bulk(desc);
682                 } else if (!desc->bd_success ||
683                            desc->bd_nob_transferred != desc->bd_nob) {
684                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
685                                   desc->bd_success ? 
686                                   "truncated" : "network error on",
687                                   desc->bd_nob_transferred, desc->bd_nob);
688                         /* XXX should this be a different errno? */
689                         rc = -ETIMEDOUT;
690                 }
691         } else {
692                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
693         }
694         comms_error = rc != 0;
695
696         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
697         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
698
699 #if CHECKSUM_BULK
700         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
701                 static int cksum_counter;
702                 obd_count client_cksum = body->oa.o_cksum;
703                 obd_count cksum = ost_checksum_bulk(desc);
704
705                 if (client_cksum != cksum) {
706                         CERROR("Bad checksum: client %x, server %x id %s\n",
707                                client_cksum, cksum,
708                                req->rq_peerstr);
709                         cksum_counter = 1;
710                         repbody->oa.o_cksum = cksum;
711                 } else {
712                         cksum_counter++;
713                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
714                                 CWARN("Checksum %u from NID %s: %x OK\n",         
715                                       cksum_counter, req->rq_peerstr, cksum);
716                 }
717         }
718 #endif
719         ost_stime_record(req, &start, 1, 1);
720         /* Must commit after prep above in all cases */
721         rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
722                           objcount, ioo, npages, local_nb, oti, rc);
723
724         ost_stime_record(req, &start, 1, 2);
725         if (rc == 0) {
726                 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
727                 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
728
729 #if CHECKSUM_BULK
730                 repbody->oa.o_cksum = ost_checksum_bulk(desc);
731                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
732 #endif
733                 /* set per-requested niobuf return codes */
734                 for (i = j = 0; i < niocount; i++) {
735                         int nob = remote_nb[i].len;
736
737                         rcs[i] = 0;
738                         do {
739                                 LASSERT(j < npages);
740                                 if (local_nb[j].rc < 0)
741                                         rcs[i] = local_nb[j].rc;
742                                 nob -= pp_rnb[j].len;
743                                 j++;
744                         } while (nob > 0);
745                         LASSERT(nob == 0);
746                 }
747                 LASSERT(j == npages);
748         }
749         /*XXX This write extents only for write-back cache extents*/
750         rc = obd_write_extents(req->rq_export, ioo, objcount, niocount, 
751                                local_nb, rc);
752  out_bulk:
753         ptlrpc_free_bulk(desc);
754  out_local:
755         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
756  out_pp_rnb:
757         free_per_page_niobufs(npages, pp_rnb, remote_nb);
758  out:
759         if (rc == 0) {
760                 oti_to_request(oti, req);
761                 rc = ptlrpc_reply(req);
762         } else if (!comms_error) {
763                 /* Only reply if there was no comms problem with bulk */
764                 req->rq_status = rc;
765                 ptlrpc_error(req);
766         } else {
767                 if (req->rq_reply_state != NULL) {
768                         /* reply out callback would free */
769                         lustre_free_reply_state (req->rq_reply_state);
770                 }
771                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
772                         CERROR("%s: bulk IO comm error evicting %s@%s id %s\n",
773                                req->rq_export->exp_obd->obd_name,
774                                req->rq_export->exp_client_uuid.uuid,
775                                req->rq_export->exp_connection->c_remote_uuid.uuid,
776                                req->rq_peerstr);
777                         ptlrpc_fail_export(req->rq_export);
778                 } else {
779                         CERROR("ignoring bulk IO comms error: "
780                                "client reconnected %s@%s id %s\n",
781                                req->rq_export->exp_client_uuid.uuid,
782                                req->rq_export->exp_connection->c_remote_uuid.uuid,
783                                req->rq_peerstr);
784                 }
785         }
786         RETURN(rc);
787 }
788 EXPORT_SYMBOL(ost_brw_write);
789
790 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
791 {
792         struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
793         struct obd_ioobj *ioo;
794         struct ost_body *body, *repbody;
795         int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
796         int swab;
797         ENTRY;
798
799         /* XXX not set to use latest protocol */
800
801         swab = lustre_msg_swabbed(req->rq_reqmsg);
802         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
803         if (body == NULL) {
804                 CERROR("Missing/short ost_body\n");
805                 GOTO(out, rc = -EFAULT);
806         }
807
808         ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
809         if (ioo == NULL) {
810                 CERROR("Missing/short ioobj\n");
811                 GOTO(out, rc = -EFAULT);
812         }
813         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
814         niocount = ioo[0].ioo_bufcnt;
815         for (i = 1; i < objcount; i++) {
816                 if (swab)
817                         lustre_swab_obd_ioobj (&ioo[i]);
818                 niocount += ioo[i].ioo_bufcnt;
819         }
820
821         remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
822                                        lustre_swab_niobuf_remote);
823         if (remote_nb == NULL) {
824                 CERROR("Missing/short niobuf\n");
825                 GOTO(out, rc = -EFAULT);
826         }
827         if (swab) {                             /* swab the remaining niobufs */
828                 for (i = 1; i < niocount; i++)
829                         lustre_swab_niobuf_remote (&remote_nb[i]);
830         }
831
832         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
833         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
834         if (npages < 0)
835                 GOTO (out, rc = npages);
836  
837         size[1] = npages * sizeof(*pp_rnb);
838         rc = lustre_pack_reply(req, 2, size, NULL);
839         if (rc)
840                 GOTO(out_pp_rnb, rc);
841
842         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
843                                         objcount, ioo, npages, pp_rnb);
844
845         if (req->rq_status)
846                 GOTO(out_pp_rnb, rc = 0);
847
848         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
849         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
850
851         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
852         memcpy(res_nb, remote_nb, size[1]);
853         rc = 0;
854 out_pp_rnb:
855         free_per_page_niobufs(npages, pp_rnb, remote_nb);
856 out:
857         if (rc) {
858                 req->rq_status = rc;
859                 ptlrpc_error(req);
860         } else
861                 ptlrpc_reply(req);
862
863         return rc;
864 }
865
866 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
867 {
868         char *key, *val;
869         int keylen, rc = 0;
870         ENTRY;
871
872         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
873         if (key == NULL) {
874                 DEBUG_REQ(D_HA, req, "no set_info key");
875                 RETURN(-EFAULT);
876         }
877         keylen = req->rq_reqmsg->buflens[0];
878
879         rc = lustre_pack_reply(req, 0, NULL, NULL);
880         if (rc)
881                 RETURN(rc);
882
883         val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
884
885         rc = obd_set_info(exp, keylen, key, req->rq_reqmsg->buflens[1], val);
886         req->rq_repmsg->status = 0;
887         RETURN(rc);
888 }
889
890 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
891 {
892         char *key;
893         int keylen, rc = 0, size = sizeof(obd_id);
894         obd_id *reply;
895         ENTRY;
896
897         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
898         if (key == NULL) {
899                 DEBUG_REQ(D_HA, req, "no get_info key");
900                 RETURN(-EFAULT);
901         }
902         keylen = req->rq_reqmsg->buflens[0];
903
904         if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
905                 RETURN(-EPROTO);
906
907         rc = lustre_pack_reply(req, 1, &size, NULL);
908         if (rc)
909                 RETURN(rc);
910
911         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
912         rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
913         req->rq_repmsg->status = 0;
914         RETURN(rc);
915 }
916
917 static int ost_llog_handle_connect(struct obd_export *exp,
918                                    struct ptlrpc_request *req)
919 {
920         struct llogd_conn_body *body;
921         int rc;
922         ENTRY;
923
924         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
925         rc = obd_llog_connect(exp, body);
926         RETURN(rc);
927 }
928
929 static int ost_filter_recovery_request(struct ptlrpc_request *req,
930                                        struct obd_device *obd, int *process)
931 {
932         switch (req->rq_reqmsg->opc) {
933         case OST_CONNECT: /* This will never get here, but for completeness. */
934         case OST_DISCONNECT:
935                *process = 1;
936                RETURN(0);
937
938         case OBD_PING:
939         case OST_CREATE:
940         case OST_DESTROY:
941         case OST_PUNCH:
942         case OST_SETATTR:
943         case OST_SYNC:
944         case OST_WRITE:
945         case OBD_LOG_CANCEL:
946         case LDLM_ENQUEUE:
947                 *process = target_queue_recovery_request(req, obd);
948                 RETURN(0);
949
950         default:
951                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
952                 *process = 0;
953                 /* XXX what should we set rq_status to here? */
954                 req->rq_status = -EAGAIN;
955                 RETURN(ptlrpc_error(req));
956         }
957 }
958
959 int ost_msg_check_version(struct lustre_msg *msg)
960 {
961         int rc;
962
963         switch(msg->opc) {
964         case OST_CONNECT:
965         case OST_DISCONNECT:
966         case OBD_PING:
967         case OST_CREATE:
968         case OST_DESTROY:
969         case OST_GETATTR:
970         case OST_SETATTR:
971         case OST_WRITE:
972         case OST_READ:
973         case OST_SAN_READ:
974         case OST_SAN_WRITE:
975         case OST_PUNCH:
976         case OST_STATFS:
977         case OST_SYNC:
978         case OST_SET_INFO:
979         case OST_GET_INFO:
980                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
981                 if (rc)
982                         CERROR("bad opc %u version %08x, expecting %08x\n",
983                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
984                 break;
985         case LDLM_ENQUEUE:
986         case LDLM_CONVERT:
987         case LDLM_CANCEL:
988         case LDLM_BL_CALLBACK:
989         case LDLM_CP_CALLBACK:
990                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
991                 if (rc)
992                         CERROR("bad opc %u version %08x, expecting %08x\n",
993                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
994                 break;
995         case OBD_LOG_CANCEL:
996         case LLOG_ORIGIN_CONNECT:
997                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
998                 if (rc)
999                         CERROR("bad opc %u version %08x, expecting %08x\n",
1000                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1001                 break;
1002         case SEC_INIT:
1003         case SEC_INIT_CONTINUE:
1004         case SEC_FINI:
1005                 rc = 0;
1006                 break;
1007         default:
1008                 CERROR("OST unexpected opcode %d\n", msg->opc);
1009                 rc = -ENOTSUPP;
1010                 break;
1011         }
1012         return rc;
1013 }
1014
1015 int ost_handle(struct ptlrpc_request *req)
1016 {
1017         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
1018         struct obd_trans_info *oti = NULL;
1019         struct obd_device *obd = NULL;
1020         ENTRY;
1021
1022         LASSERT(current->journal_info == NULL);
1023
1024         rc = ost_msg_check_version(req->rq_reqmsg);
1025         if (rc) {
1026                 CERROR("OST drop mal-formed request\n");
1027                 RETURN(rc);
1028         }
1029
1030         /* Security opc should NOT trigger any recovery events */
1031         if (req->rq_reqmsg->opc == SEC_INIT ||
1032             req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
1033             req->rq_reqmsg->opc == SEC_FINI) {
1034                 GOTO(out_check_req, rc = 0);
1035         }
1036
1037         /* XXX identical to MDS */
1038         if (req->rq_reqmsg->opc != OST_CONNECT) {
1039                 int recovering;
1040
1041                 if (req->rq_export == NULL) {
1042                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
1043                                req->rq_reqmsg->opc,
1044                                req->rq_peerstr);
1045                         req->rq_status = -ENOTCONN;
1046                         GOTO(out_check_req, rc = -ENOTCONN);
1047                 }
1048
1049                 obd = req->rq_export->exp_obd;
1050
1051                 /* Check for aborted recovery. */
1052                 spin_lock_bh(&obd->obd_processing_task_lock);
1053                 recovering = obd->obd_recovering;
1054                 spin_unlock_bh(&obd->obd_processing_task_lock);
1055                 if (recovering) {
1056                         rc = ost_filter_recovery_request(req, obd,
1057                                                          &should_process);
1058                         if (rc || !should_process)
1059                                 RETURN(rc);
1060                         if (should_process < 0) {
1061                                 req->rq_status = should_process;
1062                                 rc = ptlrpc_error(req);
1063                                 RETURN(rc);
1064                         }
1065                 }
1066         }
1067
1068         OBD_ALLOC(oti, sizeof(*oti));
1069         if (oti == NULL)
1070                 RETURN(-ENOMEM);
1071                 
1072         oti_init(oti, req);
1073
1074         switch (req->rq_reqmsg->opc) {
1075         case OST_CONNECT: {
1076                 CDEBUG(D_INODE, "connect\n");
1077                 OBD_FAIL_GOTO(OBD_FAIL_OST_CONNECT_NET, out_free_oti, rc = 0);
1078                 rc = target_handle_connect(req);
1079                 if (!rc)
1080                         obd = req->rq_export->exp_obd;
1081                 break;
1082         }
1083         case OST_DISCONNECT:
1084                 CDEBUG(D_INODE, "disconnect\n");
1085                 OBD_FAIL_GOTO(OBD_FAIL_OST_DISCONNECT_NET, out_free_oti, rc = 0);
1086                 rc = target_handle_disconnect(req);
1087                 break;
1088         case OST_CREATE:
1089                 CDEBUG(D_INODE, "create\n");
1090                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1091                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1092                 OBD_FAIL_GOTO(OBD_FAIL_OST_CREATE_NET, out_free_oti, rc = 0);
1093                 rc = ost_create(req->rq_export, req, oti);
1094                 break;
1095         case OST_DESTROY:
1096                 CDEBUG(D_INODE, "destroy\n");
1097                 OBD_FAIL_GOTO(OBD_FAIL_OST_DESTROY_NET, out_free_oti, rc = 0);
1098                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1099                 rc = ost_destroy(req->rq_export, req, oti);
1100                 break;
1101         case OST_GETATTR:
1102                 CDEBUG(D_INODE, "getattr\n");
1103                 OBD_FAIL_GOTO(OBD_FAIL_OST_GETATTR_NET, out_free_oti, rc = 0);
1104                 rc = ost_getattr(req->rq_export, req);
1105                 break;
1106         case OST_SETATTR:
1107                 CDEBUG(D_INODE, "setattr\n");
1108                 OBD_FAIL_GOTO(OBD_FAIL_OST_SETATTR_NET, out_free_oti, rc = 0);
1109                 rc = ost_setattr(req->rq_export, req, oti);
1110                 break;
1111         case OST_WRITE:
1112                 CDEBUG(D_INODE, "write\n");
1113                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1114                 OBD_FAIL_GOTO(OBD_FAIL_OST_ENOSPC, out_check_req, rc = -ENOSPC);
1115                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1116                 rc = ost_brw_write(req, oti);
1117                 LASSERT(current->journal_info == NULL);
1118                 /* ost_brw sends its own replies */
1119                 GOTO(out_free_oti, rc);
1120         case OST_READ:
1121                 CDEBUG(D_INODE, "read\n");
1122                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1123                 rc = ost_brw_read(req);
1124                 LASSERT(current->journal_info == NULL);
1125                 /* ost_brw sends its own replies */
1126                 GOTO(out_free_oti, rc);
1127         case OST_SAN_READ:
1128                 CDEBUG(D_INODE, "san read\n");
1129                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1130                 rc = ost_san_brw(req, OBD_BRW_READ);
1131                 /* ost_san_brw sends its own replies */
1132                 GOTO(out_free_oti, rc);
1133         case OST_SAN_WRITE:
1134                 CDEBUG(D_INODE, "san write\n");
1135                 OBD_FAIL_GOTO(OBD_FAIL_OST_BRW_NET, out_free_oti, rc = 0);
1136                 rc = ost_san_brw(req, OBD_BRW_WRITE);
1137                 /* ost_san_brw sends its own replies */
1138                 GOTO(out_free_oti, rc);
1139         case OST_PUNCH:
1140                 CDEBUG(D_INODE, "punch\n");
1141                 OBD_FAIL_GOTO(OBD_FAIL_OST_PUNCH_NET, out_free_oti, rc = 0);
1142                 OBD_FAIL_GOTO(OBD_FAIL_OST_EROFS, out_check_req, rc = -EROFS);
1143                 rc = ost_punch(req->rq_export, req, oti);
1144                 break;
1145         case OST_STATFS:
1146                 CDEBUG(D_INODE, "statfs\n");
1147                 OBD_FAIL_GOTO(OBD_FAIL_OST_STATFS_NET, out_free_oti, rc = 0);
1148                 rc = ost_statfs(req);
1149                 break;
1150         case OST_SYNC:
1151                 CDEBUG(D_INODE, "sync\n");
1152                 OBD_FAIL_GOTO(OBD_FAIL_OST_SYNC_NET, out_free_oti, rc = 0);
1153                 rc = ost_sync(req->rq_export, req);
1154                 break;
1155         case OST_SET_INFO:
1156                 DEBUG_REQ(D_INODE, req, "set_info");
1157                 rc = ost_set_info(req->rq_export, req);
1158                 break;
1159         case OST_GET_INFO:
1160                 DEBUG_REQ(D_INODE, req, "get_info");
1161                 rc = ost_get_info(req->rq_export, req);
1162                 break;
1163         case OBD_PING:
1164                 DEBUG_REQ(D_INODE, req, "ping");
1165                 rc = target_handle_ping(req);
1166                 break;
1167         /* FIXME - just reply status */
1168         case LLOG_ORIGIN_CONNECT:
1169                 DEBUG_REQ(D_INODE, req, "log connect\n");
1170                 rc = ost_llog_handle_connect(req->rq_export, req); 
1171                 req->rq_status = rc;
1172                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1173                 if (rc)
1174                         GOTO(out_free_oti, rc);
1175                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1176         case OBD_LOG_CANCEL:
1177                 CDEBUG(D_INODE, "log cancel\n");
1178                 OBD_FAIL_GOTO(OBD_FAIL_OBD_LOG_CANCEL_NET, out_free_oti, rc = 0);
1179                 rc = llog_origin_handle_cancel(req);
1180                 req->rq_status = rc;
1181                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1182                 if (rc)
1183                         GOTO(out_free_oti, rc);
1184                 GOTO(out_free_oti, rc = ptlrpc_reply(req));
1185         case LDLM_ENQUEUE:
1186                 CDEBUG(D_INODE, "enqueue\n");
1187                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_ENQUEUE, out_free_oti, rc = 0);
1188                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1189                                          ldlm_server_blocking_ast,
1190                                          ldlm_server_glimpse_ast);
1191                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
1192                 break;
1193         case LDLM_CONVERT:
1194                 CDEBUG(D_INODE, "convert\n");
1195                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CONVERT, out_free_oti, rc = 0);
1196                 rc = ldlm_handle_convert(req);
1197                 break;
1198         case LDLM_CANCEL:
1199                 CDEBUG(D_INODE, "cancel\n");
1200                 OBD_FAIL_GOTO(OBD_FAIL_LDLM_CANCEL, out_free_oti, rc = 0);
1201                 rc = ldlm_handle_cancel(req);
1202                 break;
1203         case LDLM_BL_CALLBACK:
1204         case LDLM_CP_CALLBACK:
1205                 CDEBUG(D_INODE, "callback\n");
1206                 CERROR("callbacks should not happen on OST\n");
1207                 /* fall through */
1208         default:
1209                 CERROR("Unexpected opcode %d\n", req->rq_reqmsg->opc);
1210                 req->rq_status = -ENOTSUPP;
1211                 rc = ptlrpc_error(req);
1212                 GOTO(out_free_oti, rc);
1213         }
1214
1215         LASSERT(current->journal_info == NULL);
1216
1217         EXIT;
1218         /* If we're DISCONNECTing, the export_data is already freed */
1219         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
1220                 if (!obd->obd_no_transno) {
1221                         req->rq_repmsg->last_committed =
1222                                 obd->obd_last_committed;
1223                 } else {
1224                         DEBUG_REQ(D_IOCTL, req,
1225                                   "not sending last_committed update");
1226                 }
1227                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
1228                        obd->obd_last_committed, req->rq_xid);
1229         }
1230
1231 out_check_req:
1232
1233         if (!rc)
1234                 oti_to_request(oti, req);
1235         target_send_reply(req, rc, fail);
1236         rc = 0;
1237         
1238 out_free_oti:
1239         if (oti)
1240                 OBD_FREE(oti, sizeof(*oti));
1241         return rc;
1242 }
1243 EXPORT_SYMBOL(ost_handle);
1244
1245 int ost_attach(struct obd_device *dev, obd_count len, void *data)
1246 {
1247         struct lprocfs_static_vars lvars;
1248
1249         lprocfs_init_vars(ost,&lvars);
1250         return lprocfs_obd_attach(dev, lvars.obd_vars);
1251 }
1252
1253 int ost_detach(struct obd_device *dev)
1254 {
1255         return lprocfs_obd_detach(dev);
1256 }
1257
1258 extern struct file_operations ost_stimes_fops;
1259
1260 static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
1261 {
1262         struct ost_obd *ost = &obd->u.ost;
1263         int rc;
1264         ENTRY;
1265
1266         rc = cleanup_group_info();
1267         if (rc)
1268                 RETURN(rc);
1269
1270         rc = llog_start_commit_thread();
1271         if (rc < 0)
1272                 RETURN(rc);
1273
1274         lprocfs_obd_seq_create(obd, "service_times", 0444, &ost_stimes_fops,
1275                                obd);
1276
1277         ost->ost_service =
1278                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1279                                 OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 30000,
1280                                 ost_handle, "ost",
1281                                 obd->obd_proc_entry);
1282         if (ost->ost_service == NULL) {
1283                 CERROR("failed to start service\n");
1284                 RETURN(-ENOMEM);
1285         }
1286
1287         rc = ptlrpc_start_n_threads(obd, ost->ost_service, OST_NUM_THREADS,
1288                                     "ll_ost");
1289         if (rc)
1290                 GOTO(out_service, rc = -EINVAL);
1291
1292         ost->ost_create_service =
1293                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
1294                                 OST_CREATE_PORTAL, OSC_REPLY_PORTAL, 30000,
1295                                 ost_handle, "ost_create",
1296                                 obd->obd_proc_entry);
1297         if (ost->ost_create_service == NULL) {
1298                 CERROR("failed to start OST create service\n");
1299                 GOTO(out_service, rc = -ENOMEM);
1300         }
1301
1302
1303         spin_lock_init(&ost->ost_lock);
1304         ost->ost_service->srv_obddev = obd;
1305         
1306         rc = ptlrpc_start_n_threads(obd, ost->ost_create_service, 1,
1307                                     "ll_ost_creat");
1308         if (rc)
1309                 GOTO(out_create, rc = -EINVAL);
1310
1311         RETURN(0);
1312
1313 out_create:
1314         ptlrpc_unregister_service(ost->ost_create_service);
1315 out_service:
1316         ptlrpc_unregister_service(ost->ost_service);
1317         RETURN(rc);
1318 }
1319
1320 extern void lgss_svc_cache_purge_all(void);
1321 static int ost_cleanup(struct obd_device *obd, int flags)
1322 {
1323         struct ost_obd *ost = &obd->u.ost;
1324         int err = 0;
1325         ENTRY;
1326
1327         spin_lock_bh(&obd->obd_processing_task_lock);
1328         if (obd->obd_recovering) {
1329                 target_cancel_recovery_timer(obd);
1330                 obd->obd_recovering = 0;
1331         }
1332         spin_unlock_bh(&obd->obd_processing_task_lock);
1333
1334         ptlrpc_stop_all_threads(ost->ost_service);
1335         ptlrpc_unregister_service(ost->ost_service);
1336
1337         ptlrpc_stop_all_threads(ost->ost_create_service);
1338         ptlrpc_unregister_service(ost->ost_create_service);
1339
1340 #ifdef ENABLE_GSS
1341         /* XXX */
1342         lgss_svc_cache_purge_all();
1343 #endif
1344         RETURN(err);
1345 }
1346
1347 /* use obd ops to offer management infrastructure */
1348 static struct obd_ops ost_obd_ops = {
1349         .o_owner        = THIS_MODULE,
1350         .o_attach       = ost_attach,
1351         .o_detach       = ost_detach,
1352         .o_setup        = ost_setup,
1353         .o_cleanup      = ost_cleanup,
1354 };
1355
1356 static int __init ost_init(void)
1357 {
1358         struct lprocfs_static_vars lvars;
1359         ENTRY;
1360
1361         lprocfs_init_vars(ost,&lvars);
1362         RETURN(class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
1363                                    OBD_OST_DEVICENAME));
1364 }
1365
1366 static void /*__exit*/ ost_exit(void)
1367 {
1368         class_unregister_type(OBD_OST_DEVICENAME);
1369 }
1370
1371 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1372 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
1373 MODULE_LICENSE("GPL");
1374
1375 module_init(ost_init);
1376 module_exit(ost_exit);