Whamcloud - gitweb
9e469525cfc806efc617221b118c43cb027a53e9
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #ifdef __KERNEL__
33 #include <linux/version.h>
34 #include <linux/module.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
41 #else
42 #include <linux/locks.h>
43 #endif
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/obd_ost.h>
51
52 #ifndef  __CYGWIN__
53 #include <linux/ctype.h>
54 #include <linux/init.h>
55 #else
56 #include <ctype.h>
57 #endif
58
59 #include <linux/lustre_ha.h>
60 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
61 #include <linux/lustre_lite.h> /* for ll_i2info */
62 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
63 #include <linux/lprocfs_status.h>
64
65 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
66 {
67         struct lprocfs_static_vars lvars;
68
69         lprocfs_init_vars(&lvars);
70         return lprocfs_obd_attach(dev, lvars.obd_vars);
71 }
72
73 static int osc_detach(struct obd_device *dev)
74 {
75         return lprocfs_obd_detach(dev);
76 }
77
78 /* Pack OSC object metadata for disk storage (LE byte order). */
79 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
80                       struct lov_stripe_md *lsm)
81 {
82         int lmm_size;
83         ENTRY;
84
85         lmm_size = sizeof(**lmmp);
86         if (!lmmp)
87                 RETURN(lmm_size);
88
89         if (*lmmp && !lsm) {
90                 OBD_FREE(*lmmp, lmm_size);
91                 *lmmp = NULL;
92                 RETURN(0);
93         }
94
95         if (!*lmmp) {
96                 OBD_ALLOC(*lmmp, lmm_size);
97                 if (!*lmmp)
98                         RETURN(-ENOMEM);
99         }
100
101         if (lsm) {
102                 LASSERT(lsm->lsm_object_id);
103                 (*lmmp)->lmm_object_id = cpu_to_le64 (lsm->lsm_object_id);
104         }
105
106         RETURN(lmm_size);
107 }
108
109 /* Unpack OSC object metadata from disk storage (LE byte order). */
110 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
111                         struct lov_mds_md *lmm, int lmm_bytes)
112 {
113         int lsm_size;
114         ENTRY;
115
116         if (lmm != NULL) {
117                 if (lmm_bytes < sizeof (*lmm)) {
118                         CERROR("lov_mds_md too small: %d, need %d\n",
119                                lmm_bytes, (int)sizeof(*lmm));
120                         RETURN (-EINVAL);
121                 }
122                 /* XXX LOV_MAGIC etc check? */
123
124                 if (lmm->lmm_object_id == cpu_to_le64 (0)) {
125                         CERROR ("lov_mds_md: zero lmm_object_id\n");
126                         RETURN (-EINVAL);
127                 }
128         }
129
130         lsm_size = sizeof(**lsmp);
131         if (!lsmp)
132                 RETURN(lsm_size);
133
134         if (*lsmp && !lmm) {
135                 OBD_FREE(*lsmp, lsm_size);
136                 *lsmp = NULL;
137                 RETURN(0);
138         }
139
140         if (!*lsmp) {
141                 OBD_ALLOC(*lsmp, lsm_size);
142                 if (!*lsmp)
143                         RETURN(-ENOMEM);
144         }
145
146         if (lmm) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
150                 LASSERT((*lsmp)->lsm_object_id);
151         }
152
153         RETURN(lsm_size);
154 }
155
156 #warning "FIXME: make this be sent from OST"
157 #define OSC_BRW_MAX_SIZE 65536
158 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
159
160 static int osc_getattr_interpret(struct ptlrpc_request *req,
161                                  struct osc_getattr_async_args *aa, int rc)
162 {
163         struct obdo     *oa = aa->aa_oa;
164         struct ost_body *body;
165         ENTRY;
166
167         if (rc != 0) {
168                 CERROR("failed: rc = %d\n", rc);
169                 RETURN (rc);
170         }
171
172         body = lustre_swab_repbuf (req, 0, sizeof (*body),
173                                    lustre_swab_ost_body);
174         if (body == NULL) {
175                 CERROR ("can't unpack ost_body\n");
176                 RETURN (-EPROTO);
177         }
178
179         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
180         memcpy(oa, &body->oa, sizeof(*oa));
181
182         /* This should really be sent by the OST */
183         oa->o_blksize = OSC_BRW_MAX_SIZE;
184         oa->o_valid |= OBD_MD_FLBLKSZ;
185
186         RETURN (0);
187 }
188
189 static int osc_getattr_async(struct lustre_handle *conn, struct obdo *oa,
190                              struct lov_stripe_md *md,
191                              struct ptlrpc_request_set *set)
192 {
193         struct ptlrpc_request *request;
194         struct ost_body *body;
195         int size = sizeof(*body);
196         struct osc_getattr_async_args *aa;
197         ENTRY;
198
199         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
200                                   &size, NULL);
201         if (!request)
202                 RETURN(-ENOMEM);
203
204         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
205         memcpy(&body->oa, oa, sizeof(*oa));
206
207         request->rq_replen = lustre_msg_size(1, &size);
208         request->rq_interpret_reply = osc_getattr_interpret;
209
210         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
211         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
212         aa->aa_oa = oa;
213
214         ptlrpc_set_add_req (set, request);
215         RETURN (0);
216 }
217
218 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
219                        struct lov_stripe_md *md)
220 {
221         struct ptlrpc_request *request;
222         struct ost_body *body;
223         int rc, size = sizeof(*body);
224         ENTRY;
225
226         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
227                                   &size, NULL);
228         if (!request)
229                 RETURN(-ENOMEM);
230
231         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
232         memcpy(&body->oa, oa, sizeof(*oa));
233
234         request->rq_replen = lustre_msg_size(1, &size);
235
236         rc = ptlrpc_queue_wait(request);
237         if (rc) {
238                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
239                 GOTO(out, rc);
240         }
241
242         body = lustre_swab_repbuf(request, 0, sizeof (*body),
243                                   lustre_swab_ost_body);
244         if (body == NULL) {
245                 CERROR ("can't unpack ost_body\n");
246                 GOTO (out, rc = -EPROTO);
247         }
248
249         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
250         memcpy(oa, &body->oa, sizeof(*oa));
251
252         /* This should really be sent by the OST */
253         oa->o_blksize = OSC_BRW_MAX_SIZE;
254         oa->o_valid |= OBD_MD_FLBLKSZ;
255
256         EXIT;
257  out:
258         ptlrpc_req_finished(request);
259         return rc;
260 }
261
262 /* The import lock must already be held. */
263 static inline void osc_update_body_handle(struct list_head *head,
264                                           struct lustre_handle *old,
265                                           struct lustre_handle *new, int op)
266 {
267         struct list_head *tmp;
268         struct ost_body *body;
269         struct ptlrpc_request *req;
270         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
271
272         list_for_each(tmp, head) {
273                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
274
275                 /* XXX ok to remove when bug 1303 resolved - rread 05/27/03  */
276                 LASSERT (req != last_req);
277                 last_req = req;
278
279                 if (req->rq_reqmsg->opc != op)
280                         continue;
281                 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
282                 if (memcmp(obdo_handle(&body->oa), old, sizeof(*old)))
283                         continue;
284
285                 DEBUG_REQ(D_HA, req, "updating close body with new fh");
286                 memcpy(obdo_handle(&body->oa), new, sizeof(*new));
287         }
288 }
289
290 static void osc_replay_open(struct ptlrpc_request *req)
291 {
292         struct lustre_handle old;
293         struct ost_body *body;
294         struct obd_client_handle *och = req->rq_replay_data;
295         struct lustre_handle *oa_handle;
296         ENTRY;
297
298         body = lustre_swab_repbuf (req, 0, sizeof (*body),
299                                    lustre_swab_ost_body);
300         LASSERT (body != NULL);
301
302         oa_handle = obdo_handle(&body->oa);
303
304         memcpy(&old, &och->och_fh, sizeof(old));
305         CDEBUG(D_HA, "updating cookie from "LPD64" to "LPD64"\n",
306                och->och_fh.cookie, oa_handle->cookie);
307         memcpy(&och->och_fh, oa_handle, sizeof(och->och_fh));
308
309         /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
310         osc_update_body_handle(&req->rq_import->imp_sending_list, &old,
311                               &och->och_fh, OST_CLOSE);
312         osc_update_body_handle(&req->rq_import->imp_delayed_list, &old,
313                               &och->och_fh, OST_CLOSE);
314         EXIT;
315 }
316
317
318 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
319                     struct lov_stripe_md *md, struct obd_trans_info *oti,
320                     struct obd_client_handle *och)
321 {
322         struct ptlrpc_request *request;
323         struct ost_body *body;
324         unsigned long flags;
325         int rc, size = sizeof(*body);
326         ENTRY;
327         LASSERT(och != NULL);
328
329         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
330                                   NULL);
331         if (!request)
332                 RETURN(-ENOMEM);
333
334         spin_lock_irqsave (&request->rq_lock, flags);
335         request->rq_replay = 1;
336         spin_unlock_irqrestore (&request->rq_lock, flags);
337
338         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
339         memcpy(&body->oa, oa, sizeof(*oa));
340
341         request->rq_replen = lustre_msg_size(1, &size);
342
343         rc = ptlrpc_queue_wait(request);
344         if (rc)
345                 GOTO(out, rc);
346
347         body = lustre_swab_repbuf (request, 0, sizeof (*body),
348                                    lustre_swab_ost_body);
349         if (body == NULL) {
350                 CERROR ("Can't unpack ost_body\n");
351                 GOTO (out, rc = -EPROTO);
352         }
353
354         memcpy(oa, &body->oa, sizeof(*oa));
355
356         /* If the open succeeded, we better have a handle */
357         /* BlueArc OSTs don't send back (o_valid | FLHANDLE).  sigh.
358          * Temporary workaround until fixed. -phil 24 Feb 03 */
359         // if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) {
360         //         CERROR ("No file handle\n");
361         //         GOTO (out, rc = -EPROTO);
362         // }
363         oa->o_valid |= OBD_MD_FLHANDLE;
364
365         /* This should really be sent by the OST */
366         oa->o_blksize = OSC_BRW_MAX_SIZE;
367         oa->o_valid |= OBD_MD_FLBLKSZ;
368
369         memcpy(&och->och_fh, obdo_handle(oa), sizeof(och->och_fh));
370         request->rq_replay_cb = osc_replay_open;
371         request->rq_replay_data = och;
372         och->och_req = ptlrpc_request_addref(request);
373         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
374
375         EXIT;
376  out:
377         ptlrpc_req_finished(request);
378         return rc;
379 }
380
381 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
382                      struct lov_stripe_md *md, struct obd_trans_info *oti)
383 {
384         struct obd_import *import = class_conn2cliimp(conn);
385         struct ptlrpc_request *request;
386         struct ost_body *body;
387         struct obd_client_handle *och;
388         unsigned long flags;
389         int rc, size = sizeof(*body);
390         ENTRY;
391
392         LASSERT(oa != NULL);
393         och = (struct obd_client_handle *)&oa->o_inline;
394         if (och->och_magic == 0) {
395                 /* Zero magic means that this file was never opened on this
396                  * OST--almost certainly because the OST was inactive at
397                  * open-time */
398                 RETURN(0);
399         }
400         LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
401
402         request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
403         if (!request)
404                 RETURN(-ENOMEM);
405
406         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
407         memcpy(&body->oa, oa, sizeof(*oa));
408
409         request->rq_replen = lustre_msg_size(1, &size);
410
411         rc = ptlrpc_queue_wait(request);
412         if (rc)
413                 CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
414
415         /* och_req == NULL can't happen any more, right? --phik */
416         if (och->och_req != NULL) {
417                 spin_lock_irqsave(&import->imp_lock, flags);
418                 spin_lock (&och->och_req->rq_lock);
419                 och->och_req->rq_replay = 0;
420                 spin_unlock (&och->och_req->rq_lock);
421                 /* see comments in llite/file.c:ll_mdc_close() */
422                 if (och->och_req->rq_transno) {
423                         /* this can't happen yet, because the OSTs don't yet
424                          * issue transnos for OPEN requests -phik 21 Apr 2003 */
425                         LBUG();
426                         if (!request->rq_transno && import->imp_replayable) {
427                                 request->rq_transno = och->och_req->rq_transno;
428                                 ptlrpc_retain_replayable_request(request,
429                                                                  import);
430                         }
431                         spin_unlock_irqrestore(&import->imp_lock, flags);
432                 } else {
433                         spin_unlock_irqrestore(&import->imp_lock, flags);
434                 }
435
436                 ptlrpc_req_finished(och->och_req);
437         }
438
439         if (!rc) {
440                 body = lustre_swab_repbuf (request, 0, sizeof (*body),
441                                            lustre_swab_ost_body);
442                 if (body == NULL) {
443                         rc = -EPROTO;
444                         CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
445                 } else
446                         memcpy(oa, &body->oa, sizeof(*oa));
447         }
448
449         ptlrpc_req_finished(request);
450         RETURN(0);
451 }
452
453 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
454                        struct lov_stripe_md *md, struct obd_trans_info *oti)
455 {
456         struct ptlrpc_request *request;
457         struct ost_body *body;
458         int rc, size = sizeof(*body);
459         ENTRY;
460
461         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
462                                   &size, NULL);
463         if (!request)
464                 RETURN(-ENOMEM);
465
466         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
467         memcpy(&body->oa, oa, sizeof(*oa));
468
469         request->rq_replen = lustre_msg_size(1, &size);
470
471         rc = ptlrpc_queue_wait(request);
472
473         ptlrpc_req_finished(request);
474         return rc;
475 }
476
477 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
478                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
479 {
480         struct ptlrpc_request *request;
481         struct ost_body *body;
482         struct lov_stripe_md *lsm;
483         int rc, size = sizeof(*body);
484         ENTRY;
485
486         LASSERT(oa);
487         LASSERT(ea);
488
489         lsm = *ea;
490         if (!lsm) {
491                 rc = obd_alloc_memmd(conn, &lsm);
492                 if (rc < 0)
493                         RETURN(rc);
494         }
495
496         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
497                                   NULL);
498         if (!request)
499                 GOTO(out, rc = -ENOMEM);
500
501         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
502         memcpy(&body->oa, oa, sizeof(*oa));
503
504         request->rq_replen = lustre_msg_size(1, &size);
505
506         rc = ptlrpc_queue_wait(request);
507         if (rc)
508                 GOTO(out_req, rc);
509
510         body = lustre_swab_repbuf (request, 0, sizeof (*body),
511                                    lustre_swab_ost_body);
512         if (body == NULL) {
513                 CERROR ("can't unpack ost_body\n");
514                 GOTO (out_req, rc = -EPROTO);
515         }
516
517         memcpy(oa, &body->oa, sizeof(*oa));
518
519         /* This should really be sent by the OST */
520         oa->o_blksize = OSC_BRW_MAX_SIZE;
521         oa->o_valid |= OBD_MD_FLBLKSZ;
522
523         lsm->lsm_object_id = oa->o_id;
524         lsm->lsm_stripe_count = 0;
525         lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
526         *ea = lsm;
527
528         if (oti != NULL)
529                 oti->oti_transno = request->rq_repmsg->transno;
530
531         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
532         EXIT;
533 out_req:
534         ptlrpc_req_finished(request);
535 out:
536         if (rc && !*ea)
537                 obd_free_memmd(conn, &lsm);
538         return rc;
539 }
540
541 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
542                      struct lov_stripe_md *md, obd_size start,
543                      obd_size end, struct obd_trans_info *oti)
544 {
545         struct ptlrpc_request *request;
546         struct ost_body *body;
547         int rc, size = sizeof(*body);
548         ENTRY;
549
550         if (!oa) {
551                 CERROR("oa NULL\n");
552                 RETURN(-EINVAL);
553         }
554
555         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
556                                   NULL);
557         if (!request)
558                 RETURN(-ENOMEM);
559
560         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
561         memcpy(&body->oa, oa, sizeof(*oa));
562
563         /* overload the size and blocks fields in the oa with start/end */
564         body->oa.o_size = start;
565         body->oa.o_blocks = end;
566         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
567
568         request->rq_replen = lustre_msg_size(1, &size);
569
570         rc = ptlrpc_queue_wait(request);
571         if (rc)
572                 GOTO(out, rc);
573
574         body = lustre_swab_repbuf (request, 0, sizeof (*body),
575                                    lustre_swab_ost_body);
576         if (body == NULL) {
577                 CERROR ("can't unpack ost_body\n");
578                 GOTO (out, rc = -EPROTO);
579         }
580
581         memcpy(oa, &body->oa, sizeof(*oa));
582
583         EXIT;
584  out:
585         ptlrpc_req_finished(request);
586         return rc;
587 }
588
589 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
590                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
591 {
592         struct ptlrpc_request *request;
593         struct ost_body *body;
594         int rc, size = sizeof(*body);
595         ENTRY;
596
597         if (!oa) {
598                 CERROR("oa NULL\n");
599                 RETURN(-EINVAL);
600         }
601         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
602                                   &size, NULL);
603         if (!request)
604                 RETURN(-ENOMEM);
605
606         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
607         memcpy(&body->oa, oa, sizeof(*oa));
608
609         request->rq_replen = lustre_msg_size(1, &size);
610
611         rc = ptlrpc_queue_wait(request);
612         if (rc)
613                 GOTO(out, rc);
614
615         body = lustre_swab_repbuf (request, 0, sizeof (*body),
616                                    lustre_swab_ost_body);
617         if (body == NULL) {
618                 CERROR ("Can't unpack body\n");
619                 GOTO (out, rc = -EPROTO);
620         }
621
622         memcpy(oa, &body->oa, sizeof(*oa));
623
624         EXIT;
625  out:
626         ptlrpc_req_finished(request);
627         return rc;
628 }
629
630 /* We assume that the reason this OSC got a short read is because it read
631  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
632  * via the LOV, and it _knows_ it's reading inside the file, it's just that
633  * this stripe never got written at or beyond this stripe offset yet. */
634 static void handle_short_read(int nob_read, obd_count page_count,
635                               struct brw_page *pga)
636 {
637         char *ptr;
638
639         /* skip bytes read OK */
640         while (nob_read > 0) {
641                 LASSERT (page_count > 0);
642
643                 if (pga->count > nob_read) {
644                         /* EOF inside this page */
645                         ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
646                         memset(ptr + nob_read, 0, pga->count - nob_read);
647                         kunmap(pga->pg);
648                         page_count--;
649                         pga++;
650                         break;
651                 }
652
653                 nob_read -= pga->count;
654                 page_count--;
655                 pga++;
656         }
657
658         /* zero remaining pages */
659         while (page_count-- > 0) {
660                 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
661                 memset(ptr, 0, pga->count);
662                 kunmap(pga->pg);
663                 pga++;
664         }
665 }
666
667 static int check_write_rcs (struct ptlrpc_request *request,
668                             int niocount, obd_count page_count,
669                             struct brw_page *pga)
670 {
671         int    i;
672         __u32 *remote_rcs;
673
674         /* return error if any niobuf was in error */
675         remote_rcs = lustre_swab_repbuf(request, 1,
676                                         sizeof(*remote_rcs) * niocount, NULL);
677         if (remote_rcs == NULL) {
678                 CERROR ("Missing/short RC vector on BRW_WRITE reply\n");
679                 return (-EPROTO);
680         }
681         if (lustre_msg_swabbed (request->rq_repmsg))
682                 for (i = 0; i < niocount; i++)
683                         __swab32s (&remote_rcs[i]);
684
685         for (i = 0; i < niocount; i++) {
686                 if (remote_rcs[i] < 0)
687                         return (remote_rcs[i]);
688
689                 if (remote_rcs[i] != 0) {
690                         CERROR ("rc[%d] invalid (%d) req %p\n",
691                                 i, remote_rcs[i], request);
692                         return (-EPROTO);
693                 }
694         }
695
696         return (0);
697 }
698
699 static inline int can_merge_pages (struct brw_page *p1, struct brw_page *p2)
700 {
701         if (p1->flag != p2->flag) {
702                 /* XXX we don't make much use of 'flag' right now
703                  * but this will warn about usage when we do */
704                 CERROR ("different flags set %d, %d\n",
705                         p1->flag, p2->flag);
706                 return (0);
707         }
708
709         return (p1->off + p1->count == p2->off);
710 }
711
712 #if CHECKSUM_BULK
713 static __u64 cksum_pages(int nob, obd_count page_count, struct brw_page *pga)
714 {
715         __u64 cksum = 0;
716         char *ptr;
717         int   i;
718
719         while (nob > 0) {
720                 LASSERT (page_count > 0);
721
722                 ptr = kmap (pga->pg);
723                 ost_checksum (&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
724                               pga->count > nob ? nob : pga->count);
725                 kunmap (pga->pg);
726
727                 nob -= pga->count;
728                 page_count--;
729                 pga++;
730         }
731
732         return (cksum);
733 }
734 #endif
735
736 static int osc_brw_prep_request(struct obd_import *imp,
737                                 struct lov_stripe_md *lsm, obd_count page_count,
738                                 struct brw_page *pga, int cmd,
739                                 int *requested_nobp, int *niocountp,
740                                 struct ptlrpc_request **reqp)
741 {
742         struct ptlrpc_request   *req;
743         struct ptlrpc_bulk_desc *desc;
744         struct ost_body         *body;
745         struct obd_ioobj        *ioobj;
746         struct niobuf_remote    *niobuf;
747         unsigned long            flags;
748         int                      niocount;
749         int                      size[3];
750         int                      i;
751         int                      requested_nob;
752         int                      opc;
753         int                      rc;
754
755         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
756
757         for (niocount = i = 1; i < page_count; i++)
758                 if (!can_merge_pages (&pga[i - 1], &pga[i]))
759                         niocount++;
760
761         size[0] = sizeof (*body);
762         size[1] = sizeof (*ioobj);
763         size[2] = niocount * sizeof (*niobuf);
764
765         req = ptlrpc_prep_req (imp, opc, 3, size, NULL);
766         if (req == NULL)
767                 return (-ENOMEM);
768
769         if (opc == OST_WRITE)
770                 desc = ptlrpc_prep_bulk_imp(req, BULK_GET_SOURCE,
771                                             OST_BULK_PORTAL);
772         else
773                 desc = ptlrpc_prep_bulk_imp(req, BULK_PUT_SINK,
774                                             OST_BULK_PORTAL);
775         if (desc == NULL)
776                 GOTO (out, rc = -ENOMEM);
777         /* NB request now owns desc and will free it when it gets freed */
778
779         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
780         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
781         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
782
783         ioobj->ioo_id = lsm->lsm_object_id;
784         ioobj->ioo_gr = 0;
785         ioobj->ioo_type = S_IFREG;
786         ioobj->ioo_bufcnt = niocount;
787
788         LASSERT (page_count > 0);
789         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
790                 struct brw_page *pg = &pga[i];
791                 struct brw_page *pg_prev = pg - 1;
792
793                 LASSERT (pg->count > 0);
794                 LASSERT ((pg->off & (PAGE_SIZE - 1)) + pg->count <= PAGE_SIZE);
795                 LASSERT (i == 0 || pg->off > pg_prev->off);
796
797                 rc = ptlrpc_prep_bulk_page (desc, pg->pg,
798                                             pg->off & (PAGE_SIZE - 1),
799                                             pg->count);
800                 if (rc != 0)
801                         GOTO (out, rc);
802
803                 requested_nob += pg->count;
804
805                 if (i > 0 &&
806                     can_merge_pages (pg_prev, pg)) {
807                         niobuf--;
808                         niobuf->len += pg->count;
809                 } else {
810                         niobuf->offset = pg->off;
811                         niobuf->len    = pg->count;
812                         niobuf->flags  = pg->flag;
813                 }
814         }
815
816         LASSERT ((void *)(niobuf - niocount) ==
817                  lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
818 #if CHECKSUM_BULK
819         body->oa.o_valid |= OBD_MD_FLCKSUM;
820         if (opc == OST_BRW_WRITE)
821                 body->oa.o_rdev = cksum_pages (requested_nob, page_count, pga);
822 #endif
823         spin_lock_irqsave (&req->rq_lock, flags);
824         req->rq_no_resend = 1;
825         spin_unlock_irqrestore (&req->rq_lock, flags);
826
827         /* size[0] still sizeof (*body) */
828         if (opc == OST_WRITE) {
829                 /* 1 RC per niobuf */
830                 size[1] = sizeof(__u32) * niocount;
831                 req->rq_replen = lustre_msg_size(2, size);
832         } else {
833                 /* 1 RC for the whole I/O */
834                 req->rq_replen = lustre_msg_size(1, size);
835         }
836
837         *niocountp = niocount;
838         *requested_nobp = requested_nob;
839         *reqp = req;
840         return (0);
841
842  out:
843         ptlrpc_req_finished (req);
844         return (rc);
845 }
846
847 static int osc_brw_fini_request (struct ptlrpc_request *req,
848                                  int requested_nob, int niocount,
849                                  obd_count page_count, struct brw_page *pga,
850                                  int rc)
851 {
852         if (rc < 0)
853                 return (rc);
854
855         if (req->rq_reqmsg->opc == OST_WRITE) {
856                 if (rc > 0) {
857                         CERROR ("Unexpected +ve rc %d\n", rc);
858                         return (-EPROTO);
859                 }
860
861                 return (check_write_rcs(req, niocount, page_count, pga));
862         }
863
864         if (rc > requested_nob) {
865                 CERROR ("Unexpected rc %d (%d requested)\n",
866                         rc, requested_nob);
867                 return (-EPROTO);
868         }
869
870         if (rc < requested_nob)
871                 handle_short_read (rc, page_count, pga);
872
873 #if CHECKSUM_BULK
874         imp = req->rq_import;
875         body = lustre_swab_repmsg (req, 0, sizeof (*body),
876                                    lustre_swab_ost_body);
877         if (body == NULL) {
878                 CERROR ("Can't unpack body\n");
879         } else if (body->oa.o_valid & OBD_MD_FLCKSUM) {
880                 static int cksum_counter;
881                 __u64 server_cksum = body->oa.o_rdev;
882                 __u64 cksum = cksum_pages (rc, page_count, pga);
883
884                 cksum_counter++;
885                 if (server_cksum != cksum) {
886                         CERROR("Bad checksum: server "LPX64", client "LPX64
887                                ", server NID "LPX64"\n", server_cksum, cksum,
888                                imp->imp_connection->c_peer.peer_nid);
889                         cksum_counter = 0;
890                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
891                         CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
892                                cksum_counter,
893                                imp->imp_connection->c_peer.peer_nid, cksum);
894         } else {
895                 static int cksum_missed;
896                 cksum_missed++;
897                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
898                         CERROR("Request checksum %u from "LPX64", no reply\n",
899                                cksum_missed,
900                                imp->imp_connection->c_peer.peer_nid);
901         }
902 #endif
903         return (0);
904 }
905
906 static int osc_brw_internal(struct lustre_handle *conn,
907                             struct lov_stripe_md *lsm,
908                             obd_count page_count, struct brw_page *pga, int cmd)
909 {
910         int                    requested_nob;
911         int                    niocount;
912         struct ptlrpc_request *request;
913         int                    rc;
914         ENTRY;
915
916 restart_bulk:
917         rc = osc_brw_prep_request(class_conn2cliimp(conn), lsm, page_count, pga,
918                                   cmd, &requested_nob, &niocount, &request);
919         /* NB ^ sets rq_no_resend */
920
921         if (rc != 0)
922                 return (rc);
923
924         rc = ptlrpc_queue_wait(request);
925
926         if (rc == -ETIMEDOUT && request->rq_resend) {
927                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
928                 ptlrpc_req_finished(request);
929                 goto restart_bulk;
930         }
931
932         rc = osc_brw_fini_request (request, requested_nob, niocount,
933                                    page_count, pga, rc);
934
935         ptlrpc_req_finished(request);
936         RETURN (rc);
937 }
938
939 static int brw_interpret(struct ptlrpc_request *request,
940                          struct osc_brw_async_args *aa, int rc)
941 {
942         int requested_nob    = aa->aa_requested_nob;
943         int niocount         = aa->aa_nio_count;
944         obd_count page_count = aa->aa_page_count;
945         struct brw_page *pga = aa->aa_pga;
946         ENTRY;
947
948         /* XXX bug 937 here */
949         if (rc == -ETIMEDOUT && request->rq_resend) {
950                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
951                 LBUG(); /* re-send.  later. */
952                 //goto restart_bulk;
953         }
954
955         rc = osc_brw_fini_request (request, requested_nob, niocount,
956                                    page_count, pga, rc);
957         RETURN (rc);
958 }
959
960 static int async_internal(struct lustre_handle *conn, struct lov_stripe_md *lsm,
961                           obd_count page_count, struct brw_page *pga,
962                           struct ptlrpc_request_set *set, int cmd)
963 {
964         struct ptlrpc_request     *request;
965         int                        requested_nob;
966         int                        nio_count;
967         struct osc_brw_async_args *aa;
968         int                        rc;
969         ENTRY;
970
971         rc = osc_brw_prep_request (class_conn2cliimp(conn),
972                                    lsm, page_count, pga, cmd,
973                                    &requested_nob, &nio_count, &request);
974         /* NB ^ sets rq_no_resend */
975
976         if (rc == 0) {
977                 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
978                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
979                 aa->aa_requested_nob = requested_nob;
980                 aa->aa_nio_count = nio_count;
981                 aa->aa_page_count = page_count;
982                 aa->aa_pga = pga;
983
984                 request->rq_interpret_reply = brw_interpret;
985                 ptlrpc_set_add_req(set, request);
986         }
987         RETURN (rc);
988 }
989
990 #ifndef min_t
991 #define min_t(type,x,y) \
992         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
993 #endif
994
995 /*
996  * ugh, we want disk allocation on the target to happen in offset order.  we'll
997  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
998  * fine for our small page arrays and doesn't require allocation.  its an
999  * insertion sort that swaps elements that are strides apart, shrinking the
1000  * stride down until its '1' and the array is sorted.
1001  */
1002 static void sort_brw_pages(struct brw_page *array, int num)
1003 {
1004         int stride, i, j;
1005         struct brw_page tmp;
1006
1007         if (num == 1)
1008                 return;
1009         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1010                 ;
1011
1012         do {
1013                 stride /= 3;
1014                 for (i = stride ; i < num ; i++) {
1015                         tmp = array[i];
1016                         j = i;
1017                         while (j >= stride && array[j - stride].off > tmp.off) {
1018                                 array[j] = array[j - stride];
1019                                 j -= stride;
1020                         }
1021                         array[j] = tmp;
1022                 }
1023         } while (stride > 1);
1024 }
1025
1026 /* make sure we the regions we're passing to elan don't violate its '4
1027  * fragments' constraint.  portal headers are a fragment, all full
1028  * PAGE_SIZE long pages count as 1 fragment, and each partial page
1029  * counts as a fragment.  I think.  see bug 934. */
1030 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1031 {
1032         int frags_left = 3;
1033         int saw_whole_frag = 0;
1034         int i;
1035
1036         for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1037                 if (pg->count == PAGE_SIZE) {
1038                         if (!saw_whole_frag) {
1039                                 saw_whole_frag = 1;
1040                                 frags_left--;
1041                         }
1042                 } else {
1043                         frags_left--;
1044                 }
1045         }
1046         return i;
1047 }
1048
1049 static int osc_brw(int cmd, struct lustre_handle *conn,
1050                    struct lov_stripe_md *md, obd_count page_count,
1051                    struct brw_page *pga, struct obd_trans_info *oti)
1052 {
1053         ENTRY;
1054
1055         if (cmd == OBD_BRW_CHECK) {
1056                 /* The caller just wants to know if there's a chance that this
1057                  * I/O can succeed */
1058                 struct obd_import *imp = class_conn2cliimp(conn);
1059
1060                 if (imp == NULL || imp->imp_invalid)
1061                         RETURN(-EIO);
1062                 RETURN(0);
1063         }
1064
1065         while (page_count) {
1066                 obd_count pages_per_brw;
1067                 int rc;
1068
1069                 if (page_count > OSC_BRW_MAX_IOV)
1070                         pages_per_brw = OSC_BRW_MAX_IOV;
1071                 else
1072                         pages_per_brw = page_count;
1073
1074                 sort_brw_pages(pga, pages_per_brw);
1075                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1076
1077                 rc = osc_brw_internal(conn, md, pages_per_brw, pga, cmd);
1078
1079                 if (rc != 0)
1080                         RETURN(rc);
1081
1082                 page_count -= pages_per_brw;
1083                 pga += pages_per_brw;
1084         }
1085         RETURN(0);
1086 }
1087
1088 static int osc_brw_async(int cmd, struct lustre_handle *conn,
1089                          struct lov_stripe_md *md, obd_count page_count,
1090                          struct brw_page *pga, struct ptlrpc_request_set *set,
1091                          struct obd_trans_info *oti)
1092 {
1093         ENTRY;
1094
1095         if (cmd == OBD_BRW_CHECK) {
1096                 /* The caller just wants to know if there's a chance that this
1097                  * I/O can succeed */
1098                 struct obd_import *imp = class_conn2cliimp(conn);
1099
1100                 if (imp == NULL || imp->imp_invalid)
1101                         RETURN(-EIO);
1102                 RETURN(0);
1103         }
1104
1105         while (page_count) {
1106                 obd_count pages_per_brw;
1107                 int rc;
1108
1109                 if (page_count > OSC_BRW_MAX_IOV)
1110                         pages_per_brw = OSC_BRW_MAX_IOV;
1111                 else
1112                         pages_per_brw = page_count;
1113
1114                 sort_brw_pages(pga, pages_per_brw);
1115                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1116
1117                 rc = async_internal(conn, md, pages_per_brw, pga, set, cmd);
1118
1119                 if (rc != 0)
1120                         RETURN(rc);
1121
1122                 page_count -= pages_per_brw;
1123                 pga += pages_per_brw;
1124         }
1125         RETURN(0);
1126 }
1127
1128 #ifdef __KERNEL__
1129 /* Note: caller will lock/unlock, and set uptodate on the pages */
1130 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1131 static int sanosc_brw_read(struct lustre_handle *conn,
1132                            struct lov_stripe_md *lsm,
1133                            obd_count page_count,
1134                            struct brw_page *pga)
1135 {
1136         struct ptlrpc_request *request = NULL;
1137         struct ost_body *body;
1138         struct niobuf_remote *nioptr;
1139         struct obd_ioobj *iooptr;
1140         int rc, size[3] = {sizeof(*body)}, mapped = 0;
1141         int swab;
1142         ENTRY;
1143
1144         /* XXX does not handle 'new' brw protocol */
1145
1146         size[1] = sizeof(struct obd_ioobj);
1147         size[2] = page_count * sizeof(*nioptr);
1148
1149         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
1150                                   size, NULL);
1151         if (!request)
1152                 RETURN(-ENOMEM);
1153
1154         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1155         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1156         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1157                                 sizeof (*nioptr) * page_count);
1158
1159         iooptr->ioo_id = lsm->lsm_object_id;
1160         iooptr->ioo_gr = 0;
1161         iooptr->ioo_type = S_IFREG;
1162         iooptr->ioo_bufcnt = page_count;
1163
1164         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1165                 LASSERT(PageLocked(pga[mapped].pg));
1166                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1167
1168                 nioptr->offset = pga[mapped].off;
1169                 nioptr->len    = pga[mapped].count;
1170                 nioptr->flags  = pga[mapped].flag;
1171         }
1172
1173         size[1] = page_count * sizeof(*nioptr);
1174         request->rq_replen = lustre_msg_size(2, size);
1175
1176         rc = ptlrpc_queue_wait(request);
1177         if (rc)
1178                 GOTO(out_req, rc);
1179
1180         swab = lustre_msg_swabbed (request->rq_repmsg);
1181         LASSERT_REPSWAB (request, 1);
1182         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1183         if (!nioptr) {
1184                 /* nioptr missing or short */
1185                 GOTO(out_req, rc = -EPROTO);
1186         }
1187
1188         /* actual read */
1189         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1190                 struct page *page = pga[mapped].pg;
1191                 struct buffer_head *bh;
1192                 kdev_t dev;
1193
1194                 if (swab)
1195                         lustre_swab_niobuf_remote (nioptr);
1196
1197                 /* got san device associated */
1198                 LASSERT(class_conn2obd(conn));
1199                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1200
1201                 /* hole */
1202                 if (!nioptr->offset) {
1203                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
1204                                         page->mapping->host->i_ino,
1205                                         page->index);
1206                         memset(page_address(page), 0, PAGE_SIZE);
1207                         continue;
1208                 }
1209
1210                 if (!page->buffers) {
1211                         create_empty_buffers(page, dev, PAGE_SIZE);
1212                         bh = page->buffers;
1213
1214                         clear_bit(BH_New, &bh->b_state);
1215                         set_bit(BH_Mapped, &bh->b_state);
1216                         bh->b_blocknr = (unsigned long)nioptr->offset;
1217
1218                         clear_bit(BH_Uptodate, &bh->b_state);
1219
1220                         ll_rw_block(READ, 1, &bh);
1221                 } else {
1222                         bh = page->buffers;
1223
1224                         /* if buffer already existed, it must be the
1225                          * one we mapped before, check it */
1226                         LASSERT(!test_bit(BH_New, &bh->b_state));
1227                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
1228                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
1229
1230                         /* wait it's io completion */
1231                         if (test_bit(BH_Lock, &bh->b_state))
1232                                 wait_on_buffer(bh);
1233
1234                         if (!test_bit(BH_Uptodate, &bh->b_state))
1235                                 ll_rw_block(READ, 1, &bh);
1236                 }
1237
1238
1239                 /* must do syncronous write here */
1240                 wait_on_buffer(bh);
1241                 if (!buffer_uptodate(bh)) {
1242                         /* I/O error */
1243                         rc = -EIO;
1244                         goto out_req;
1245                 }
1246         }
1247
1248 out_req:
1249         ptlrpc_req_finished(request);
1250         RETURN(rc);
1251 }
1252
1253 static int sanosc_brw_write(struct lustre_handle *conn,
1254                             struct lov_stripe_md *lsm,
1255                             obd_count page_count,
1256                             struct brw_page *pga)
1257 {
1258         struct ptlrpc_request *request = NULL;
1259         struct ost_body *body;
1260         struct niobuf_remote *nioptr;
1261         struct obd_ioobj *iooptr;
1262         int rc, size[3] = {sizeof(*body)}, mapped = 0;
1263         int swab;
1264         ENTRY;
1265
1266         size[1] = sizeof(struct obd_ioobj);
1267         size[2] = page_count * sizeof(*nioptr);
1268
1269         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1270                                   3, size, NULL);
1271         if (!request)
1272                 RETURN(-ENOMEM);
1273
1274         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1275         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1276         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1277                                 sizeof (*nioptr) * page_count);
1278
1279         iooptr->ioo_id = lsm->lsm_object_id;
1280         iooptr->ioo_gr = 0;
1281         iooptr->ioo_type = S_IFREG;
1282         iooptr->ioo_bufcnt = page_count;
1283
1284         /* pack request */
1285         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1286                 LASSERT(PageLocked(pga[mapped].pg));
1287                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1288
1289                 nioptr->offset = pga[mapped].off;
1290                 nioptr->len    = pga[mapped].count;
1291                 nioptr->flags  = pga[mapped].flag;
1292         }
1293
1294         size[1] = page_count * sizeof(*nioptr);
1295         request->rq_replen = lustre_msg_size(2, size);
1296
1297         rc = ptlrpc_queue_wait(request);
1298         if (rc)
1299                 GOTO(out_req, rc);
1300
1301         swab = lustre_msg_swabbed (request->rq_repmsg);
1302         LASSERT_REPSWAB (request, 1);
1303         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1304         if (!nioptr) {
1305                 CERROR("absent/short niobuf array\n");
1306                 GOTO(out_req, rc = -EPROTO);
1307         }
1308
1309         /* actual write */
1310         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1311                 struct page *page = pga[mapped].pg;
1312                 struct buffer_head *bh;
1313                 kdev_t dev;
1314
1315                 if (swab)
1316                         lustre_swab_niobuf_remote (nioptr);
1317
1318                 /* got san device associated */
1319                 LASSERT(class_conn2obd(conn));
1320                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1321
1322                 if (!page->buffers) {
1323                         create_empty_buffers(page, dev, PAGE_SIZE);
1324                 } else {
1325                         /* checking */
1326                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1327                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1328                         LASSERT(page->buffers->b_blocknr ==
1329                                 (unsigned long)nioptr->offset);
1330                 }
1331                 bh = page->buffers;
1332
1333                 LASSERT(bh);
1334
1335                 /* if buffer locked, wait it's io completion */
1336                 if (test_bit(BH_Lock, &bh->b_state))
1337                         wait_on_buffer(bh);
1338
1339                 clear_bit(BH_New, &bh->b_state);
1340                 set_bit(BH_Mapped, &bh->b_state);
1341
1342                 /* override the block nr */
1343                 bh->b_blocknr = (unsigned long)nioptr->offset;
1344
1345                 /* we are about to write it, so set it
1346                  * uptodate/dirty
1347                  * page lock should garentee no race condition here */
1348                 set_bit(BH_Uptodate, &bh->b_state);
1349                 set_bit(BH_Dirty, &bh->b_state);
1350
1351                 ll_rw_block(WRITE, 1, &bh);
1352
1353                 /* must do syncronous write here */
1354                 wait_on_buffer(bh);
1355                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1356                         /* I/O error */
1357                         rc = -EIO;
1358                         goto out_req;
1359                 }
1360         }
1361
1362 out_req:
1363         ptlrpc_req_finished(request);
1364         RETURN(rc);
1365 }
1366
1367 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1368                       struct lov_stripe_md *lsm, obd_count page_count,
1369                       struct brw_page *pga, struct obd_trans_info *oti)
1370 {
1371         ENTRY;
1372
1373         while (page_count) {
1374                 obd_count pages_per_brw;
1375                 int rc;
1376
1377                 if (page_count > OSC_BRW_MAX_IOV)
1378                         pages_per_brw = OSC_BRW_MAX_IOV;
1379                 else
1380                         pages_per_brw = page_count;
1381
1382                 if (cmd & OBD_BRW_WRITE)
1383                         rc = sanosc_brw_write(conn, lsm, pages_per_brw, pga);
1384                 else
1385                         rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga);
1386
1387                 if (rc != 0)
1388                         RETURN(rc);
1389
1390                 page_count -= pages_per_brw;
1391                 pga += pages_per_brw;
1392         }
1393         RETURN(0);
1394 }
1395 #endif
1396 #endif
1397
1398 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1399                        struct lustre_handle *parent_lock,
1400                        __u32 type, void *extentp, int extent_len, __u32 mode,
1401                        int *flags, void *callback, void *data,
1402                        struct lustre_handle *lockh)
1403 {
1404         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1405         struct obd_device *obddev = class_conn2obd(connh);
1406         struct ldlm_extent *extent = extentp;
1407         int rc;
1408         ENTRY;
1409
1410         /* Filesystem lock extents are extended to page boundaries so that
1411          * dealing with the page cache is a little smoother.  */
1412         extent->start -= extent->start & ~PAGE_MASK;
1413         extent->end |= ~PAGE_MASK;
1414
1415         /* Next, search for already existing extent locks that will cover us */
1416         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA, &res_id,
1417                              type, extent, sizeof(extent), mode, data, lockh);
1418         if (rc == 1)
1419                 /* We already have a lock, and it's referenced */
1420                 RETURN(ELDLM_OK);
1421
1422         /* If we're trying to read, we also search for an existing PW lock.  The
1423          * VFS and page cache already protect us locally, so lots of readers/
1424          * writers can share a single PW lock.
1425          *
1426          * There are problems with conversion deadlocks, so instead of
1427          * converting a read lock to a write lock, we'll just enqueue a new
1428          * one.
1429          *
1430          * At some point we should cancel the read lock instead of making them
1431          * send us a blocking callback, but there are problems with canceling
1432          * locks out from other users right now, too. */
1433
1434         if (mode == LCK_PR) {
1435                 rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA,
1436                                      &res_id, type, extent, sizeof(extent),
1437                                      LCK_PW, data, lockh);
1438                 if (rc == 1) {
1439                         /* FIXME: This is not incredibly elegant, but it might
1440                          * be more elegant than adding another parameter to
1441                          * lock_match.  I want a second opinion. */
1442                         ldlm_lock_addref(lockh, LCK_PR);
1443                         ldlm_lock_decref(lockh, LCK_PW);
1444
1445                         RETURN(ELDLM_OK);
1446                 }
1447         }
1448
1449         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1450                               res_id, type, extent, sizeof(extent), mode, flags,
1451                               ldlm_completion_ast, callback, data, lockh);
1452         RETURN(rc);
1453 }
1454
1455 static int osc_match(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1456                        __u32 type, void *extentp, int extent_len, __u32 mode,
1457                        int *flags, void *data, struct lustre_handle *lockh)
1458 {
1459         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1460         struct obd_device *obddev = class_conn2obd(connh);
1461         struct ldlm_extent *extent = extentp;
1462         int rc;
1463         ENTRY;
1464
1465         /* Filesystem lock extents are extended to page boundaries so that
1466          * dealing with the page cache is a little smoother */
1467         extent->start -= extent->start & ~PAGE_MASK;
1468         extent->end |= ~PAGE_MASK;
1469
1470         /* Next, search for already existing extent locks that will cover us */
1471         rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id, type,
1472                              extent, sizeof(extent), mode, data, lockh);
1473         if (rc)
1474                 RETURN(rc);
1475
1476         /* If we're trying to read, we also search for an existing PW lock.  The
1477          * VFS and page cache already protect us locally, so lots of readers/
1478          * writers can share a single PW lock. */
1479         if (mode == LCK_PR) {
1480                 rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id,
1481                                      type, extent, sizeof(extent), LCK_PW,
1482                                      data, lockh);
1483                 if (rc == 1) {
1484                         /* FIXME: This is not incredibly elegant, but it might
1485                          * be more elegant than adding another parameter to
1486                          * lock_match.  I want a second opinion. */
1487                         ldlm_lock_addref(lockh, LCK_PR);
1488                         ldlm_lock_decref(lockh, LCK_PW);
1489                 }
1490         }
1491         RETURN(rc);
1492 }
1493
1494 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1495                       __u32 mode, struct lustre_handle *lockh)
1496 {
1497         ENTRY;
1498
1499         ldlm_lock_decref(lockh, mode);
1500
1501         RETURN(0);
1502 }
1503
1504 static int osc_cancel_unused(struct lustre_handle *connh,
1505                              struct lov_stripe_md *lsm, int flags, void *opaque)
1506 {
1507         struct obd_device *obddev = class_conn2obd(connh);
1508         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1509
1510         return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
1511                                       opaque);
1512 }
1513
1514 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1515 {
1516         struct obd_statfs *msfs;
1517         struct ptlrpc_request *request;
1518         int rc, size = sizeof(*osfs);
1519         ENTRY;
1520
1521         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
1522                                   NULL);
1523         if (!request)
1524                 RETURN(-ENOMEM);
1525
1526         request->rq_replen = lustre_msg_size(1, &size);
1527
1528         rc = ptlrpc_queue_wait(request);
1529         if (rc) {
1530                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1531                 GOTO(out, rc);
1532         }
1533
1534         msfs = lustre_swab_repbuf (request, 0, sizeof (*msfs),
1535                                    lustre_swab_obd_statfs);
1536         if (msfs == NULL) {
1537                 CERROR ("Can't unpack obd_statfs\n");
1538                 GOTO (out, rc = -EPROTO);
1539         }
1540
1541         memcpy (osfs, msfs, sizeof (*msfs));
1542
1543         EXIT;
1544  out:
1545         ptlrpc_req_finished(request);
1546         return rc;
1547 }
1548
1549 /* Retrieve object striping information.
1550  *
1551  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1552  * the maximum number of OST indices which will fit in the user buffer.
1553  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1554  */
1555 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1556                          struct lov_mds_md *lmmu)
1557 {
1558         struct lov_mds_md lmm, *lmmk;
1559         int rc, lmm_size;
1560         ENTRY;
1561
1562         if (!lsm)
1563                 RETURN(-ENODATA);
1564
1565         rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1566         if (rc)
1567                 RETURN(-EFAULT);
1568
1569         if (lmm.lmm_magic != LOV_MAGIC)
1570                 RETURN(-EINVAL);
1571
1572         if (lmm.lmm_ost_count < 1)
1573                 RETURN(-EOVERFLOW);
1574
1575         lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1576         OBD_ALLOC(lmmk, lmm_size);
1577         if (rc < 0)
1578                 RETURN(rc);
1579
1580         lmmk->lmm_stripe_count = 1;
1581         lmmk->lmm_ost_count = 1;
1582         lmmk->lmm_object_id = lsm->lsm_object_id;
1583         lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1584
1585         if (copy_to_user(lmmu, lmmk, lmm_size))
1586                 rc = -EFAULT;
1587
1588         OBD_FREE(lmmk, lmm_size);
1589
1590         RETURN(rc);
1591 }
1592
1593 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1594                          void *karg, void *uarg)
1595 {
1596         struct obd_device *obddev = class_conn2obd(conn);
1597         struct obd_ioctl_data *data = karg;
1598         int err = 0;
1599         ENTRY;
1600
1601         switch (cmd) {
1602         case IOC_OSC_REGISTER_LOV: {
1603                 if (obddev->u.cli.cl_containing_lov)
1604                         GOTO(out, err = -EALREADY);
1605                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1606                 GOTO(out, err);
1607         }
1608         case OBD_IOC_LOV_GET_CONFIG: {
1609                 char *buf;
1610                 struct lov_desc *desc;
1611                 struct obd_uuid uuid;
1612
1613                 buf = NULL;
1614                 len = 0;
1615                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1616                         GOTO(out, err = -EINVAL);
1617
1618                 data = (struct obd_ioctl_data *)buf;
1619
1620                 if (sizeof(*desc) > data->ioc_inllen1) {
1621                         OBD_FREE(buf, len);
1622                         GOTO(out, err = -EINVAL);
1623                 }
1624
1625                 if (data->ioc_inllen2 < sizeof(uuid)) {
1626                         OBD_FREE(buf, len);
1627                         GOTO(out, err = -EINVAL);
1628                 }
1629
1630                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1631                 desc->ld_tgt_count = 1;
1632                 desc->ld_active_tgt_count = 1;
1633                 desc->ld_default_stripe_count = 1;
1634                 desc->ld_default_stripe_size = 0;
1635                 desc->ld_default_stripe_offset = 0;
1636                 desc->ld_pattern = 0;
1637                 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1638
1639                 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1640
1641                 err = copy_to_user((void *)uarg, buf, len);
1642                 if (err)
1643                         err = -EFAULT;
1644                 obd_ioctl_freedata(buf, len);
1645                 GOTO(out, err);
1646         }
1647         case LL_IOC_LOV_SETSTRIPE:
1648                 err = obd_alloc_memmd(conn, karg);
1649                 if (err > 0)
1650                         err = 0;
1651                 GOTO(out, err);
1652         case LL_IOC_LOV_GETSTRIPE:
1653                 err = osc_getstripe(conn, karg, uarg);
1654                 GOTO(out, err);
1655         case OBD_IOC_CLIENT_RECOVER:
1656                 err = ptlrpc_recover_import(obddev->u.cli.cl_import,
1657                                             data->ioc_inlbuf1);
1658                 GOTO(out, err);
1659         case IOC_OSC_SET_ACTIVE:
1660                 err = ptlrpc_set_import_active(obddev->u.cli.cl_import,
1661                                                data->ioc_offset);
1662                 GOTO(out, err);
1663         default:
1664                 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1665                 GOTO(out, err = -ENOTTY);
1666         }
1667 out:
1668         return err;
1669 }
1670
1671 static int osc_get_info(struct lustre_handle *conn, obd_count keylen,
1672                         void *key, __u32 *vallen, void *val)
1673 {
1674         ENTRY;
1675         if (!vallen || !val)
1676                 RETURN(-EFAULT);
1677
1678         if (keylen > strlen("lock_to_stripe") &&
1679             strcmp(key, "lock_to_stripe") == 0) {
1680                 __u32 *stripe = val;
1681                 *vallen = sizeof(*stripe);
1682                 *stripe = 0;
1683                 RETURN(0);
1684         }
1685         RETURN(-EINVAL);
1686 }
1687
1688 struct obd_ops osc_obd_ops = {
1689         o_owner:        THIS_MODULE,
1690         o_attach:       osc_attach,
1691         o_detach:       osc_detach,
1692         o_setup:        client_obd_setup,
1693         o_cleanup:      client_obd_cleanup,
1694         o_connect:      client_import_connect,
1695         o_disconnect:   client_import_disconnect,
1696         o_statfs:       osc_statfs,
1697         o_packmd:       osc_packmd,
1698         o_unpackmd:     osc_unpackmd,
1699         o_create:       osc_create,
1700         o_destroy:      osc_destroy,
1701         o_getattr:      osc_getattr,
1702         o_getattr_async: osc_getattr_async,
1703         o_setattr:      osc_setattr,
1704         o_open:         osc_open,
1705         o_close:        osc_close,
1706         o_brw:          osc_brw,
1707         o_brw_async:    osc_brw_async,
1708         o_punch:        osc_punch,
1709         o_enqueue:      osc_enqueue,
1710         o_match:        osc_match,
1711         o_cancel:       osc_cancel,
1712         o_cancel_unused: osc_cancel_unused,
1713         o_iocontrol:    osc_iocontrol,
1714         o_get_info:     osc_get_info
1715 };
1716
1717 struct obd_ops sanosc_obd_ops = {
1718         o_owner:        THIS_MODULE,
1719         o_attach:       osc_attach,
1720         o_detach:       osc_detach,
1721         o_cleanup:      client_obd_cleanup,
1722         o_connect:      client_import_connect,
1723         o_disconnect:   client_import_disconnect,
1724         o_statfs:       osc_statfs,
1725         o_packmd:       osc_packmd,
1726         o_unpackmd:     osc_unpackmd,
1727         o_create:       osc_create,
1728         o_destroy:      osc_destroy,
1729         o_getattr:      osc_getattr,
1730         o_getattr_async: osc_getattr_async,
1731         o_setattr:      osc_setattr,
1732         o_open:         osc_open,
1733         o_close:        osc_close,
1734 #ifdef __KERNEL__
1735         o_setup:        client_sanobd_setup,
1736         o_brw:          sanosc_brw,
1737 #endif
1738         o_punch:        osc_punch,
1739         o_enqueue:      osc_enqueue,
1740         o_match:        osc_match,
1741         o_cancel:       osc_cancel,
1742         o_cancel_unused: osc_cancel_unused,
1743         o_iocontrol:    osc_iocontrol,
1744 };
1745
1746 int __init osc_init(void)
1747 {
1748         struct lprocfs_static_vars lvars;
1749         int rc;
1750         ENTRY;
1751
1752         LASSERT(sizeof(struct obd_client_handle) <= FD_OSTDATA_SIZE);
1753         LASSERT(sizeof(struct obd_client_handle) <= OBD_INLINESZ);
1754
1755         lprocfs_init_vars(&lvars);
1756
1757         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1758                                  LUSTRE_OSC_NAME);
1759         if (rc)
1760                 RETURN(rc);
1761
1762         rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1763                                  LUSTRE_SANOSC_NAME);
1764         if (rc)
1765                 class_unregister_type(LUSTRE_OSC_NAME);
1766
1767         RETURN(rc);
1768 }
1769
1770 static void __exit osc_exit(void)
1771 {
1772         class_unregister_type(LUSTRE_SANOSC_NAME);
1773         class_unregister_type(LUSTRE_OSC_NAME);
1774 }
1775
1776 #ifdef __KERNEL__
1777 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1778 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1779 MODULE_LICENSE("GPL");
1780
1781 module_init(osc_init);
1782 module_exit(osc_exit);
1783 #endif