Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #ifdef __KERNEL__
33 #include <linux/version.h>
34 #include <linux/module.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
41 #else
42 #include <linux/locks.h>
43 #endif
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/lustre_otree.h>
51 #include <linux/obd_ost.h>
52 #include <linux/obd_lov.h>
53
54 #ifndef  __CYGWIN__
55 #include <linux/ctype.h>
56 #include <linux/init.h>
57 #else
58 #include <ctype.h>
59 #endif
60
61 #include <linux/lustre_ha.h>
62 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
63 #include <linux/lustre_lite.h> /* for ll_i2info */
64 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
65 #include <linux/lprocfs_status.h>
66
67 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
68 {
69         struct lprocfs_static_vars lvars;
70
71         lprocfs_init_vars(&lvars);
72         return lprocfs_obd_attach(dev, lvars.obd_vars);
73 }
74
75 static int osc_detach(struct obd_device *dev)
76 {
77         return lprocfs_obd_detach(dev);
78 }
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64 (lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN (-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == cpu_to_le64 (0)) {
127                         CERROR ("lov_mds_md: zero lmm_object_id\n");
128                         RETURN (-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (!lsmp)
134                 RETURN(lsm_size);
135
136         if (*lsmp && !lmm) {
137                 OBD_FREE(*lsmp, lsm_size);
138                 *lsmp = NULL;
139                 RETURN(0);
140         }
141
142         if (!*lsmp) {
143                 OBD_ALLOC(*lsmp, lsm_size);
144                 if (!*lsmp)
145                         RETURN(-ENOMEM);
146
147                 (*lsmp)->lsm_oinfo[0].loi_dirty_ot =
148                         &(*lsmp)->lsm_oinfo[0].loi_dirty_ot_inline;
149                 ot_init((*lsmp)->lsm_oinfo[0].loi_dirty_ot);
150         }
151
152         if (lmm) {
153                 /* XXX zero *lsmp? */
154                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
155                 LASSERT((*lsmp)->lsm_object_id);
156         }
157
158         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 #warning "FIXME: make this be sent from OST"
164 #define OSC_BRW_MAX_SIZE 65536
165 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
166
167 static int osc_getattr_interpret(struct ptlrpc_request *req,
168                                  struct osc_getattr_async_args *aa, int rc)
169 {
170         struct obdo     *oa = aa->aa_oa;
171         struct ost_body *body;
172         ENTRY;
173
174         if (rc != 0) {
175                 CERROR("failed: rc = %d\n", rc);
176                 RETURN (rc);
177         }
178
179         body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
180         if (body == NULL) {
181                 CERROR ("can't unpack ost_body\n");
182                 RETURN (-EPROTO);
183         }
184
185         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
186         memcpy(oa, &body->oa, sizeof(*oa));
187
188         /* This should really be sent by the OST */
189         oa->o_blksize = OSC_BRW_MAX_SIZE;
190         oa->o_valid |= OBD_MD_FLBLKSZ;
191
192         RETURN (0);
193 }
194
195 static int osc_getattr_async(struct lustre_handle *conn, struct obdo *oa,
196                              struct lov_stripe_md *md,
197                              struct ptlrpc_request_set *set)
198 {
199         struct ptlrpc_request *request;
200         struct ost_body *body;
201         int size = sizeof(*body);
202         struct osc_getattr_async_args *aa;
203         ENTRY;
204
205         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
206                                   &size, NULL);
207         if (!request)
208                 RETURN(-ENOMEM);
209
210         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
211         memcpy(&body->oa, oa, sizeof(*oa));
212
213         request->rq_replen = lustre_msg_size(1, &size);
214         request->rq_interpret_reply = osc_getattr_interpret;
215
216         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
217         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
218         aa->aa_oa = oa;
219
220         ptlrpc_set_add_req (set, request);
221         RETURN (0);
222 }
223
224 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
225                        struct lov_stripe_md *md)
226 {
227         struct ptlrpc_request *request;
228         struct ost_body *body;
229         int rc, size = sizeof(*body);
230         ENTRY;
231
232         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
233                                   &size, NULL);
234         if (!request)
235                 RETURN(-ENOMEM);
236
237         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
238         memcpy(&body->oa, oa, sizeof(*oa));
239
240         request->rq_replen = lustre_msg_size(1, &size);
241
242         rc = ptlrpc_queue_wait(request);
243         if (rc) {
244                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
245                 GOTO(out, rc);
246         }
247
248         body = lustre_swab_repbuf(request, 0, sizeof (*body),
249                                   lustre_swab_ost_body);
250         if (body == NULL) {
251                 CERROR ("can't unpack ost_body\n");
252                 GOTO (out, rc = -EPROTO);
253         }
254
255         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
256         memcpy(oa, &body->oa, sizeof(*oa));
257
258         /* This should really be sent by the OST */
259         oa->o_blksize = OSC_BRW_MAX_SIZE;
260         oa->o_valid |= OBD_MD_FLBLKSZ;
261
262         EXIT;
263  out:
264         ptlrpc_req_finished(request);
265         return rc;
266 }
267
268 /* The import lock must already be held. */
269 static inline void osc_update_body_handle(struct list_head *head,
270                                           struct lustre_handle *old,
271                                           struct lustre_handle *new, int op)
272 {
273         struct list_head *tmp;
274         struct ost_body *body;
275         struct ptlrpc_request *req;
276         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
277
278         list_for_each(tmp, head) {
279                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
280
281                 /* XXX ok to remove when bug 1303 resolved - rread 05/27/03  */
282                 LASSERT (req != last_req);
283                 last_req = req;
284
285                 if (req->rq_reqmsg->opc != op)
286                         continue;
287                 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
288                 if (memcmp(obdo_handle(&body->oa), old, sizeof(*old)))
289                         continue;
290
291                 DEBUG_REQ(D_HA, req, "updating close body with new fh");
292                 memcpy(obdo_handle(&body->oa), new, sizeof(*new));
293         }
294 }
295
296 static void osc_replay_open(struct ptlrpc_request *req)
297 {
298         struct lustre_handle old;
299         struct ost_body *body;
300         struct obd_client_handle *och = req->rq_replay_data;
301         struct lustre_handle *oa_handle;
302         ENTRY;
303
304         body = lustre_swab_repbuf (req, 0, sizeof (*body),
305                                    lustre_swab_ost_body);
306         LASSERT (body != NULL);
307
308         oa_handle = obdo_handle(&body->oa);
309
310         memcpy(&old, &och->och_fh, sizeof(old));
311         CDEBUG(D_HA, "updating cookie from "LPD64" to "LPD64"\n",
312                och->och_fh.cookie, oa_handle->cookie);
313         memcpy(&och->och_fh, oa_handle, sizeof(och->och_fh));
314
315         /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
316         osc_update_body_handle(&req->rq_import->imp_sending_list, &old,
317                               &och->och_fh, OST_CLOSE);
318         osc_update_body_handle(&req->rq_import->imp_delayed_list, &old,
319                               &och->och_fh, OST_CLOSE);
320         EXIT;
321 }
322
323
324 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
325                     struct lov_stripe_md *md, struct obd_trans_info *oti,
326                     struct obd_client_handle *och)
327 {
328         struct ptlrpc_request *request;
329         struct ost_body *body;
330         unsigned long flags;
331         int rc, size = sizeof(*body);
332         ENTRY;
333         LASSERT(och != NULL);
334
335         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
336                                   NULL);
337         if (!request)
338                 RETURN(-ENOMEM);
339
340         spin_lock_irqsave (&request->rq_lock, flags);
341         request->rq_replay = 1;
342         spin_unlock_irqrestore (&request->rq_lock, flags);
343
344         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
345         memcpy(&body->oa, oa, sizeof(*oa));
346
347         request->rq_replen = lustre_msg_size(1, &size);
348
349         rc = ptlrpc_queue_wait(request);
350         if (rc)
351                 GOTO(out, rc);
352
353         body = lustre_swab_repbuf (request, 0, sizeof (*body),
354                                    lustre_swab_ost_body);
355         if (body == NULL) {
356                 CERROR ("Can't unpack ost_body\n");
357                 GOTO (out, rc = -EPROTO);
358         }
359
360         memcpy(oa, &body->oa, sizeof(*oa));
361
362         /* If the open succeeded, we better have a handle */
363         /* BlueArc OSTs don't send back (o_valid | FLHANDLE).  sigh.
364          * Temporary workaround until fixed. -phil 24 Feb 03 */
365         // if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) {
366         //         CERROR ("No file handle\n");
367         //         GOTO (out, rc = -EPROTO);
368         // }
369         oa->o_valid |= OBD_MD_FLHANDLE;
370
371         /* This should really be sent by the OST */
372         oa->o_blksize = OSC_BRW_MAX_SIZE;
373         oa->o_valid |= OBD_MD_FLBLKSZ;
374
375         memcpy(&och->och_fh, obdo_handle(oa), sizeof(och->och_fh));
376         request->rq_replay_cb = osc_replay_open;
377         request->rq_replay_data = och;
378         och->och_req = ptlrpc_request_addref(request);
379         och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
380
381         EXIT;
382  out:
383         ptlrpc_req_finished(request);
384         return rc;
385 }
386
387 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
388                      struct lov_stripe_md *md, struct obd_trans_info *oti)
389 {
390         struct obd_import *import = class_conn2cliimp(conn);
391         struct ptlrpc_request *request;
392         struct ost_body *body;
393         struct obd_client_handle *och;
394         unsigned long flags;
395         int rc, size = sizeof(*body);
396         ENTRY;
397
398         LASSERT(oa != NULL);
399         och = (struct obd_client_handle *)&oa->o_inline;
400         if (och->och_magic == 0) {
401                 /* Zero magic means that this file was never opened on this
402                  * OST--almost certainly because the OST was inactive at
403                  * open-time */
404                 RETURN(0);
405         }
406         LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
407
408         request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
409         if (!request)
410                 RETURN(-ENOMEM);
411
412         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
413         memcpy(&body->oa, oa, sizeof(*oa));
414
415         request->rq_replen = lustre_msg_size(1, &size);
416
417         rc = ptlrpc_queue_wait(request);
418         if (rc)
419                 CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
420
421         /* och_req == NULL can't happen any more, right? --phik */
422         if (och->och_req != NULL) {
423                 spin_lock_irqsave(&import->imp_lock, flags);
424                 spin_lock (&och->och_req->rq_lock);
425                 och->och_req->rq_replay = 0;
426                 spin_unlock (&och->och_req->rq_lock);
427                 /* see comments in llite/file.c:ll_mdc_close() */
428                 if (och->och_req->rq_transno) {
429                         /* this can't happen yet, because the OSTs don't yet
430                          * issue transnos for OPEN requests -phik 21 Apr 2003 */
431                         LBUG();
432                         if (!request->rq_transno && import->imp_replayable) {
433                                 request->rq_transno = och->och_req->rq_transno;
434                                 ptlrpc_retain_replayable_request(request,
435                                                                  import);
436                         }
437                         spin_unlock_irqrestore(&import->imp_lock, flags);
438                 } else {
439                         spin_unlock_irqrestore(&import->imp_lock, flags);
440                 }
441
442                 ptlrpc_req_finished(och->och_req);
443         }
444
445         if (!rc) {
446                 body = lustre_swab_repbuf (request, 0, sizeof (*body),
447                                            lustre_swab_ost_body);
448                 if (body == NULL) {
449                         rc = -EPROTO;
450                         CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
451                 } else
452                         memcpy(oa, &body->oa, sizeof(*oa));
453         }
454
455         ptlrpc_req_finished(request);
456         RETURN(0);
457 }
458
459 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
460                        struct lov_stripe_md *md, struct obd_trans_info *oti)
461 {
462         struct ptlrpc_request *request;
463         struct ost_body *body;
464         int rc, size = sizeof(*body);
465         ENTRY;
466
467         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
468                                   &size, NULL);
469         if (!request)
470                 RETURN(-ENOMEM);
471
472         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
473         memcpy(&body->oa, oa, sizeof(*oa));
474
475         request->rq_replen = lustre_msg_size(1, &size);
476
477         rc = ptlrpc_queue_wait(request);
478
479         ptlrpc_req_finished(request);
480         return rc;
481 }
482
483 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
484                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
485 {
486         struct ptlrpc_request *request;
487         struct ost_body *body;
488         struct lov_stripe_md *lsm;
489         int rc, size = sizeof(*body);
490         ENTRY;
491
492         LASSERT(oa);
493         LASSERT(ea);
494
495         lsm = *ea;
496         if (!lsm) {
497                 rc = obd_alloc_memmd(conn, &lsm);
498                 if (rc < 0)
499                         RETURN(rc);
500         }
501
502         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
503                                   NULL);
504         if (!request)
505                 GOTO(out, rc = -ENOMEM);
506
507         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
508         memcpy(&body->oa, oa, sizeof(*oa));
509
510         request->rq_replen = lustre_msg_size(1, &size);
511
512         rc = ptlrpc_queue_wait(request);
513         if (rc)
514                 GOTO(out_req, rc);
515
516         body = lustre_swab_repbuf (request, 0, sizeof (*body),
517                                    lustre_swab_ost_body);
518         if (body == NULL) {
519                 CERROR ("can't unpack ost_body\n");
520                 GOTO (out_req, rc = -EPROTO);
521         }
522
523         memcpy(oa, &body->oa, sizeof(*oa));
524
525         /* This should really be sent by the OST */
526         oa->o_blksize = OSC_BRW_MAX_SIZE;
527         oa->o_valid |= OBD_MD_FLBLKSZ;
528
529         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
530          * have valid lsm_oinfo data structs, so don't go touching that.
531          * This needs to be fixed in a big way.
532          */
533         lsm->lsm_object_id = oa->o_id;
534         lsm->lsm_stripe_count = 0;
535         lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
536         *ea = lsm;
537
538         if (oti != NULL)
539                 oti->oti_transno = request->rq_repmsg->transno;
540
541         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
542         EXIT;
543 out_req:
544         ptlrpc_req_finished(request);
545 out:
546         if (rc && !*ea)
547                 obd_free_memmd(conn, &lsm);
548         return rc;
549 }
550
551 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
552                      struct lov_stripe_md *md, obd_size start,
553                      obd_size end, struct obd_trans_info *oti)
554 {
555         struct ptlrpc_request *request;
556         struct ost_body *body;
557         int rc, size = sizeof(*body);
558         ENTRY;
559
560         if (!oa) {
561                 CERROR("oa NULL\n");
562                 RETURN(-EINVAL);
563         }
564
565         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
566                                   NULL);
567         if (!request)
568                 RETURN(-ENOMEM);
569
570         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
571         memcpy(&body->oa, oa, sizeof(*oa));
572
573         /* overload the size and blocks fields in the oa with start/end */
574         body->oa.o_size = start;
575         body->oa.o_blocks = end;
576         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
577
578         request->rq_replen = lustre_msg_size(1, &size);
579
580         rc = ptlrpc_queue_wait(request);
581         if (rc)
582                 GOTO(out, rc);
583
584         body = lustre_swab_repbuf (request, 0, sizeof (*body),
585                                    lustre_swab_ost_body);
586         if (body == NULL) {
587                 CERROR ("can't unpack ost_body\n");
588                 GOTO (out, rc = -EPROTO);
589         }
590
591         memcpy(oa, &body->oa, sizeof(*oa));
592
593         EXIT;
594  out:
595         ptlrpc_req_finished(request);
596         return rc;
597 }
598
599 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
600                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
601 {
602         struct ptlrpc_request *request;
603         struct ost_body *body;
604         int rc, size = sizeof(*body);
605         ENTRY;
606
607         if (!oa) {
608                 CERROR("oa NULL\n");
609                 RETURN(-EINVAL);
610         }
611         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
612                                   &size, NULL);
613         if (!request)
614                 RETURN(-ENOMEM);
615
616         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
617         memcpy(&body->oa, oa, sizeof(*oa));
618
619         request->rq_replen = lustre_msg_size(1, &size);
620
621         rc = ptlrpc_queue_wait(request);
622         if (rc)
623                 GOTO(out, rc);
624
625         body = lustre_swab_repbuf (request, 0, sizeof (*body),
626                                    lustre_swab_ost_body);
627         if (body == NULL) {
628                 CERROR ("Can't unpack body\n");
629                 GOTO (out, rc = -EPROTO);
630         }
631
632         memcpy(oa, &body->oa, sizeof(*oa));
633
634         EXIT;
635  out:
636         ptlrpc_req_finished(request);
637         return rc;
638 }
639
640 static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
641 {
642         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
643
644         LASSERT(!(body->oa.o_valid & bits));
645
646         body->oa.o_valid |= bits;
647         down(&cli->cl_dirty_sem);
648         body->oa.o_blocks = cli->cl_dirty;
649         body->oa.o_rdev = cli->cl_dirty_granted;
650         up(&cli->cl_dirty_sem);
651         CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
652                cli->cl_dirty, cli->cl_dirty_granted);
653 }
654
655 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
656 {
657         if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
658                 if (cli->cl_ost_can_grant) {
659                         CDEBUG(D_INODE, "%s can't grant\n",
660                                cli->cl_import->imp_target_uuid.uuid);
661                 }
662                 cli->cl_ost_can_grant = 0;
663                 return;
664         }
665
666         CDEBUG(D_INODE, "got "LPU64" grant\n", body->oa.o_rdev);
667         down(&cli->cl_dirty_sem);
668         cli->cl_dirty_granted = body->oa.o_rdev;
669         /* XXX check for over-run and wake up the io thread that
670          * doesn't exist yet */
671         up(&cli->cl_dirty_sem);
672 }
673
674 /* We assume that the reason this OSC got a short read is because it read
675  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
676  * via the LOV, and it _knows_ it's reading inside the file, it's just that
677  * this stripe never got written at or beyond this stripe offset yet. */
678 static void handle_short_read(int nob_read, obd_count page_count,
679                               struct brw_page *pga)
680 {
681         char *ptr;
682
683         /* skip bytes read OK */
684         while (nob_read > 0) {
685                 LASSERT (page_count > 0);
686
687                 if (pga->count > nob_read) {
688                         /* EOF inside this page */
689                         ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
690                         memset(ptr + nob_read, 0, pga->count - nob_read);
691                         kunmap(pga->pg);
692                         page_count--;
693                         pga++;
694                         break;
695                 }
696
697                 nob_read -= pga->count;
698                 page_count--;
699                 pga++;
700         }
701
702         /* zero remaining pages */
703         while (page_count-- > 0) {
704                 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
705                 memset(ptr, 0, pga->count);
706                 kunmap(pga->pg);
707                 pga++;
708         }
709 }
710
711 static int check_write_rcs (struct ptlrpc_request *request,
712                             int niocount, obd_count page_count,
713                             struct brw_page *pga)
714 {
715         int    i;
716         __u32 *remote_rcs;
717
718         /* return error if any niobuf was in error */
719         remote_rcs = lustre_swab_repbuf(request, 1,
720                                         sizeof(*remote_rcs) * niocount, NULL);
721         if (remote_rcs == NULL) {
722                 CERROR ("Missing/short RC vector on BRW_WRITE reply\n");
723                 return (-EPROTO);
724         }
725         if (lustre_msg_swabbed (request->rq_repmsg))
726                 for (i = 0; i < niocount; i++)
727                         __swab32s (&remote_rcs[i]);
728
729         for (i = 0; i < niocount; i++) {
730                 if (remote_rcs[i] < 0)
731                         return (remote_rcs[i]);
732
733                 if (remote_rcs[i] != 0) {
734                         CERROR ("rc[%d] invalid (%d) req %p\n",
735                                 i, remote_rcs[i], request);
736                         return (-EPROTO);
737                 }
738         }
739
740         return (0);
741 }
742
743 static inline int can_merge_pages (struct brw_page *p1, struct brw_page *p2)
744 {
745         if (p1->flag != p2->flag) {
746                 /* XXX we don't make much use of 'flag' right now
747                  * but this will warn about usage when we do */
748                 CERROR ("different flags set %d, %d\n",
749                         p1->flag, p2->flag);
750                 return (0);
751         }
752
753         return (p1->off + p1->count == p2->off);
754 }
755
756 #if CHECKSUM_BULK
757 static obd_count cksum_pages(int nob, obd_count page_count,
758                              struct brw_page *pga)
759 {
760         obd_count cksum = 0;
761         char *ptr;
762         int   i;
763
764         while (nob > 0) {
765                 LASSERT (page_count > 0);
766
767                 ptr = kmap (pga->pg);
768                 ost_checksum (&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
769                               pga->count > nob ? nob : pga->count);
770                 kunmap (pga->pg);
771
772                 nob -= pga->count;
773                 page_count--;
774                 pga++;
775         }
776
777         return (cksum);
778 }
779 #endif
780
781 static int osc_brw_prep_request(struct obd_import *imp,
782                                 struct lov_stripe_md *lsm, obd_count page_count,
783                                 struct brw_page *pga, int cmd,
784                                 int *requested_nobp, int *niocountp,
785                                 struct ptlrpc_request **reqp)
786 {
787         struct ptlrpc_request   *req;
788         struct ptlrpc_bulk_desc *desc;
789         struct client_obd       *cli = &imp->imp_obd->u.cli;
790         struct ost_body         *body;
791         struct obd_ioobj        *ioobj;
792         struct niobuf_remote    *niobuf;
793         unsigned long            flags;
794         int                      niocount;
795         int                      size[3];
796         int                      i;
797         int                      requested_nob;
798         int                      opc;
799         int                      rc;
800
801         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
802
803         for (niocount = i = 1; i < page_count; i++)
804                 if (!can_merge_pages (&pga[i - 1], &pga[i]))
805                         niocount++;
806
807         size[0] = sizeof (*body);
808         size[1] = sizeof (*ioobj);
809         size[2] = niocount * sizeof (*niobuf);
810
811         req = ptlrpc_prep_req (imp, opc, 3, size, NULL);
812         if (req == NULL)
813                 return (-ENOMEM);
814
815         if (opc == OST_WRITE)
816                 desc = ptlrpc_prep_bulk_imp(req, BULK_GET_SOURCE,
817                                             OST_BULK_PORTAL);
818         else
819                 desc = ptlrpc_prep_bulk_imp(req, BULK_PUT_SINK,
820                                             OST_BULK_PORTAL);
821         if (desc == NULL)
822                 GOTO (out, rc = -ENOMEM);
823         /* NB request now owns desc and will free it when it gets freed */
824
825         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
826         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
827         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
828
829         ioobj->ioo_id = lsm->lsm_object_id;
830         ioobj->ioo_gr = 0;
831         ioobj->ioo_type = S_IFREG;
832         ioobj->ioo_bufcnt = niocount;
833
834         LASSERT (page_count > 0);
835         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
836                 struct brw_page *pg = &pga[i];
837                 struct brw_page *pg_prev = pg - 1;
838
839                 LASSERT (pg->count > 0);
840                 LASSERT ((pg->off & (PAGE_SIZE - 1)) + pg->count <= PAGE_SIZE);
841                 LASSERT (i == 0 || pg->off > pg_prev->off);
842
843                 rc = ptlrpc_prep_bulk_page (desc, pg->pg,
844                                             pg->off & (PAGE_SIZE - 1),
845                                             pg->count);
846                 if (rc != 0)
847                         GOTO (out, rc);
848
849                 requested_nob += pg->count;
850
851                 if (i > 0 && can_merge_pages (pg_prev, pg)) {
852                         niobuf--;
853                         niobuf->len += pg->count;
854                 } else {
855                         niobuf->offset = pg->off;
856                         niobuf->len    = pg->count;
857                         niobuf->flags  = pg->flag;
858                 }
859         }
860
861         LASSERT ((void *)(niobuf - niocount) ==
862                  lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
863 #if CHECKSUM_BULK
864         body->oa.o_valid |= OBD_MD_FLCKSUM;
865         if (opc == OST_BRW_WRITE)
866                 body->oa.o_nlink = cksum_pages (requested_nob, page_count, pga);
867 #endif
868         osc_announce_cached(cli, body);
869         spin_lock_irqsave (&req->rq_lock, flags);
870         req->rq_no_resend = 1;
871         spin_unlock_irqrestore (&req->rq_lock, flags);
872
873         /* size[0] still sizeof (*body) */
874         if (opc == OST_WRITE) {
875                 /* 1 RC per niobuf */
876                 size[1] = sizeof(__u32) * niocount;
877                 req->rq_replen = lustre_msg_size(2, size);
878         } else {
879                 /* 1 RC for the whole I/O */
880                 req->rq_replen = lustre_msg_size(1, size);
881         }
882
883         *niocountp = niocount;
884         *requested_nobp = requested_nob;
885         *reqp = req;
886         return (0);
887
888  out:
889         ptlrpc_req_finished (req);
890         return (rc);
891 }
892
893 static int osc_brw_fini_request (struct ptlrpc_request *req,
894                                  int requested_nob, int niocount,
895                                  obd_count page_count, struct brw_page *pga,
896                                  int rc)
897 {
898         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
899         struct ost_body *body;
900         if (rc < 0)
901                 return (rc);
902
903         body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
904         if (body == NULL) {
905                 CERROR ("Can't unpack body\n");
906                 RETURN(-EPROTO);
907         }
908         osc_update_grant(cli, body);
909
910         if (req->rq_reqmsg->opc == OST_WRITE) {
911                 if (rc > 0) {
912                         CERROR ("Unexpected +ve rc %d\n", rc);
913                         return (-EPROTO);
914                 }
915
916                 return (check_write_rcs(req, niocount, page_count, pga));
917         }
918
919         if (rc > requested_nob) {
920                 CERROR ("Unexpected rc %d (%d requested)\n",
921                         rc, requested_nob);
922                 return (-EPROTO);
923         }
924
925         if (rc < requested_nob)
926                 handle_short_read(rc, page_count, pga);
927
928 #if CHECKSUM_BULK
929         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
930                 static int cksum_counter;
931                 obd_count server_cksum = body->oa.o_nlink;
932                 obd_count cksum = cksum_pages(rc, page_count, pga);
933
934                 cksum_counter++;
935                 if (server_cksum != cksum) {
936                         CERROR("Bad checksum: server "LPX64", client "LPX64
937                                ", server NID "LPX64"\n", server_cksum, cksum,
938                                imp->imp_connection->c_peer.peer_nid);
939                         cksum_counter = 0;
940                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
941                         CERROR("Checksum %u from "LPX64" OK: %x\n",
942                                cksum_counter,
943                                imp->imp_connection->c_peer.peer_nid, cksum);
944         } else {
945                 static int cksum_missed;
946                 cksum_missed++;
947                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
948                         CERROR("Request checksum %u from "LPX64", no reply\n",
949                                cksum_missed,
950                                imp->imp_connection->c_peer.peer_nid);
951         }
952 #endif
953         return (0);
954 }
955
956 static int osc_brw_internal(struct lustre_handle *conn,
957                             struct lov_stripe_md *lsm,
958                             obd_count page_count, struct brw_page *pga, int cmd)
959 {
960         int                    requested_nob;
961         int                    niocount;
962         struct ptlrpc_request *request;
963         int                    rc;
964         ENTRY;
965
966 restart_bulk:
967         rc = osc_brw_prep_request(class_conn2cliimp(conn), lsm, page_count, pga,
968                                   cmd, &requested_nob, &niocount, &request);
969         /* NB ^ sets rq_no_resend */
970
971         if (rc != 0)
972                 return (rc);
973
974         rc = ptlrpc_queue_wait(request);
975
976         if (rc == -ETIMEDOUT && request->rq_resend) {
977                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
978                 ptlrpc_req_finished(request);
979                 goto restart_bulk;
980         }
981
982         rc = osc_brw_fini_request (request, requested_nob, niocount,
983                                    page_count, pga, rc);
984
985         ptlrpc_req_finished(request);
986         RETURN (rc);
987 }
988
989 static int brw_interpret(struct ptlrpc_request *request,
990                          struct osc_brw_async_args *aa, int rc)
991 {
992         int requested_nob    = aa->aa_requested_nob;
993         int niocount         = aa->aa_nio_count;
994         obd_count page_count = aa->aa_page_count;
995         struct brw_page *pga = aa->aa_pga;
996         ENTRY;
997
998         /* XXX bug 937 here */
999         if (rc == -ETIMEDOUT && request->rq_resend) {
1000                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1001                 LBUG(); /* re-send.  later. */
1002                 //goto restart_bulk;
1003         }
1004
1005         rc = osc_brw_fini_request (request, requested_nob, niocount,
1006                                    page_count, pga, rc);
1007         RETURN (rc);
1008 }
1009
1010 static int async_internal(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1011                           obd_count page_count, struct brw_page *pga,
1012                           struct ptlrpc_request_set *set, int cmd)
1013 {
1014         struct ptlrpc_request     *request;
1015         int                        requested_nob;
1016         int                        nio_count;
1017         struct osc_brw_async_args *aa;
1018         int                        rc;
1019         ENTRY;
1020
1021         rc = osc_brw_prep_request (class_conn2cliimp(conn),
1022                                    lsm, page_count, pga, cmd,
1023                                    &requested_nob, &nio_count, &request);
1024         /* NB ^ sets rq_no_resend */
1025
1026         if (rc == 0) {
1027                 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
1028                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1029                 aa->aa_requested_nob = requested_nob;
1030                 aa->aa_nio_count = nio_count;
1031                 aa->aa_page_count = page_count;
1032                 aa->aa_pga = pga;
1033
1034                 request->rq_interpret_reply = brw_interpret;
1035                 ptlrpc_set_add_req(set, request);
1036         }
1037         RETURN (rc);
1038 }
1039
1040 #ifndef min_t
1041 #define min_t(type,x,y) \
1042         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1043 #endif
1044
1045 /*
1046  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1047  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1048  * fine for our small page arrays and doesn't require allocation.  its an
1049  * insertion sort that swaps elements that are strides apart, shrinking the
1050  * stride down until its '1' and the array is sorted.
1051  */
1052 static void sort_brw_pages(struct brw_page *array, int num)
1053 {
1054         int stride, i, j;
1055         struct brw_page tmp;
1056
1057         if (num == 1)
1058                 return;
1059         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1060                 ;
1061
1062         do {
1063                 stride /= 3;
1064                 for (i = stride ; i < num ; i++) {
1065                         tmp = array[i];
1066                         j = i;
1067                         while (j >= stride && array[j - stride].off > tmp.off) {
1068                                 array[j] = array[j - stride];
1069                                 j -= stride;
1070                         }
1071                         array[j] = tmp;
1072                 }
1073         } while (stride > 1);
1074 }
1075
1076 /* make sure we the regions we're passing to elan don't violate its '4
1077  * fragments' constraint.  portal headers are a fragment, all full
1078  * PAGE_SIZE long pages count as 1 fragment, and each partial page
1079  * counts as a fragment.  I think.  see bug 934. */
1080 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1081 {
1082         int frags_left = 3;
1083         int saw_whole_frag = 0;
1084         int i;
1085
1086         for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1087                 if (pg->count == PAGE_SIZE) {
1088                         if (!saw_whole_frag) {
1089                                 saw_whole_frag = 1;
1090                                 frags_left--;
1091                         }
1092                 } else {
1093                         frags_left--;
1094                 }
1095         }
1096         return i;
1097 }
1098
1099 static int osc_brw(int cmd, struct lustre_handle *conn,
1100                    struct lov_stripe_md *md, obd_count page_count,
1101                    struct brw_page *pga, struct obd_trans_info *oti)
1102 {
1103         ENTRY;
1104
1105         if (cmd == OBD_BRW_CHECK) {
1106                 /* The caller just wants to know if there's a chance that this
1107                  * I/O can succeed */
1108                 struct obd_import *imp = class_conn2cliimp(conn);
1109
1110                 if (imp == NULL || imp->imp_invalid)
1111                         RETURN(-EIO);
1112                 RETURN(0);
1113         }
1114
1115         while (page_count) {
1116                 obd_count pages_per_brw;
1117                 int rc;
1118
1119                 if (page_count > OSC_BRW_MAX_IOV)
1120                         pages_per_brw = OSC_BRW_MAX_IOV;
1121                 else
1122                         pages_per_brw = page_count;
1123
1124                 sort_brw_pages(pga, pages_per_brw);
1125                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1126
1127                 rc = osc_brw_internal(conn, md, pages_per_brw, pga, cmd);
1128
1129                 if (rc != 0)
1130                         RETURN(rc);
1131
1132                 page_count -= pages_per_brw;
1133                 pga += pages_per_brw;
1134         }
1135         RETURN(0);
1136 }
1137
1138 static int osc_brw_async(int cmd, struct lustre_handle *conn,
1139                          struct lov_stripe_md *md, obd_count page_count,
1140                          struct brw_page *pga, struct ptlrpc_request_set *set,
1141                          struct obd_trans_info *oti)
1142 {
1143         ENTRY;
1144
1145         if (cmd == OBD_BRW_CHECK) {
1146                 /* The caller just wants to know if there's a chance that this
1147                  * I/O can succeed */
1148                 struct obd_import *imp = class_conn2cliimp(conn);
1149
1150                 if (imp == NULL || imp->imp_invalid)
1151                         RETURN(-EIO);
1152                 RETURN(0);
1153         }
1154
1155         while (page_count) {
1156                 obd_count pages_per_brw;
1157                 int rc;
1158
1159                 if (page_count > OSC_BRW_MAX_IOV)
1160                         pages_per_brw = OSC_BRW_MAX_IOV;
1161                 else
1162                         pages_per_brw = page_count;
1163
1164                 sort_brw_pages(pga, pages_per_brw);
1165                 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1166
1167                 rc = async_internal(conn, md, pages_per_brw, pga, set, cmd);
1168
1169                 if (rc != 0)
1170                         RETURN(rc);
1171
1172                 page_count -= pages_per_brw;
1173                 pga += pages_per_brw;
1174         }
1175         RETURN(0);
1176 }
1177
1178 #ifdef __KERNEL__
1179 /* Note: caller will lock/unlock, and set uptodate on the pages */
1180 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1181 static int sanosc_brw_read(struct lustre_handle *conn,
1182                            struct lov_stripe_md *lsm,
1183                            obd_count page_count,
1184                            struct brw_page *pga)
1185 {
1186         struct ptlrpc_request *request = NULL;
1187         struct ost_body *body;
1188         struct niobuf_remote *nioptr;
1189         struct obd_ioobj *iooptr;
1190         int rc, size[3] = {sizeof(*body)}, mapped = 0;
1191         int swab;
1192         ENTRY;
1193
1194         /* XXX does not handle 'new' brw protocol */
1195
1196         size[1] = sizeof(struct obd_ioobj);
1197         size[2] = page_count * sizeof(*nioptr);
1198
1199         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
1200                                   size, NULL);
1201         if (!request)
1202                 RETURN(-ENOMEM);
1203
1204         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1205         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1206         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1207                                 sizeof (*nioptr) * page_count);
1208
1209         iooptr->ioo_id = lsm->lsm_object_id;
1210         iooptr->ioo_gr = 0;
1211         iooptr->ioo_type = S_IFREG;
1212         iooptr->ioo_bufcnt = page_count;
1213
1214         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1215                 LASSERT(PageLocked(pga[mapped].pg));
1216                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1217
1218                 nioptr->offset = pga[mapped].off;
1219                 nioptr->len    = pga[mapped].count;
1220                 nioptr->flags  = pga[mapped].flag;
1221         }
1222
1223         size[1] = page_count * sizeof(*nioptr);
1224         request->rq_replen = lustre_msg_size(2, size);
1225
1226         rc = ptlrpc_queue_wait(request);
1227         if (rc)
1228                 GOTO(out_req, rc);
1229
1230         swab = lustre_msg_swabbed (request->rq_repmsg);
1231         LASSERT_REPSWAB (request, 1);
1232         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1233         if (!nioptr) {
1234                 /* nioptr missing or short */
1235                 GOTO(out_req, rc = -EPROTO);
1236         }
1237
1238         /* actual read */
1239         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1240                 struct page *page = pga[mapped].pg;
1241                 struct buffer_head *bh;
1242                 kdev_t dev;
1243
1244                 if (swab)
1245                         lustre_swab_niobuf_remote (nioptr);
1246
1247                 /* got san device associated */
1248                 LASSERT(class_conn2obd(conn));
1249                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1250
1251                 /* hole */
1252                 if (!nioptr->offset) {
1253                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
1254                                         page->mapping->host->i_ino,
1255                                         page->index);
1256                         memset(page_address(page), 0, PAGE_SIZE);
1257                         continue;
1258                 }
1259
1260                 if (!page->buffers) {
1261                         create_empty_buffers(page, dev, PAGE_SIZE);
1262                         bh = page->buffers;
1263
1264                         clear_bit(BH_New, &bh->b_state);
1265                         set_bit(BH_Mapped, &bh->b_state);
1266                         bh->b_blocknr = (unsigned long)nioptr->offset;
1267
1268                         clear_bit(BH_Uptodate, &bh->b_state);
1269
1270                         ll_rw_block(READ, 1, &bh);
1271                 } else {
1272                         bh = page->buffers;
1273
1274                         /* if buffer already existed, it must be the
1275                          * one we mapped before, check it */
1276                         LASSERT(!test_bit(BH_New, &bh->b_state));
1277                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
1278                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
1279
1280                         /* wait it's io completion */
1281                         if (test_bit(BH_Lock, &bh->b_state))
1282                                 wait_on_buffer(bh);
1283
1284                         if (!test_bit(BH_Uptodate, &bh->b_state))
1285                                 ll_rw_block(READ, 1, &bh);
1286                 }
1287
1288
1289                 /* must do syncronous write here */
1290                 wait_on_buffer(bh);
1291                 if (!buffer_uptodate(bh)) {
1292                         /* I/O error */
1293                         rc = -EIO;
1294                         goto out_req;
1295                 }
1296         }
1297
1298 out_req:
1299         ptlrpc_req_finished(request);
1300         RETURN(rc);
1301 }
1302
1303 static int sanosc_brw_write(struct lustre_handle *conn,
1304                             struct lov_stripe_md *lsm,
1305                             obd_count page_count,
1306                             struct brw_page *pga)
1307 {
1308         struct ptlrpc_request *request = NULL;
1309         struct ost_body *body;
1310         struct niobuf_remote *nioptr;
1311         struct obd_ioobj *iooptr;
1312         int rc, size[3] = {sizeof(*body)}, mapped = 0;
1313         int swab;
1314         ENTRY;
1315
1316         size[1] = sizeof(struct obd_ioobj);
1317         size[2] = page_count * sizeof(*nioptr);
1318
1319         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1320                                   3, size, NULL);
1321         if (!request)
1322                 RETURN(-ENOMEM);
1323
1324         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1325         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1326         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1327                                 sizeof (*nioptr) * page_count);
1328
1329         iooptr->ioo_id = lsm->lsm_object_id;
1330         iooptr->ioo_gr = 0;
1331         iooptr->ioo_type = S_IFREG;
1332         iooptr->ioo_bufcnt = page_count;
1333
1334         /* pack request */
1335         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1336                 LASSERT(PageLocked(pga[mapped].pg));
1337                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1338
1339                 nioptr->offset = pga[mapped].off;
1340                 nioptr->len    = pga[mapped].count;
1341                 nioptr->flags  = pga[mapped].flag;
1342         }
1343
1344         size[1] = page_count * sizeof(*nioptr);
1345         request->rq_replen = lustre_msg_size(2, size);
1346
1347         rc = ptlrpc_queue_wait(request);
1348         if (rc)
1349                 GOTO(out_req, rc);
1350
1351         swab = lustre_msg_swabbed (request->rq_repmsg);
1352         LASSERT_REPSWAB (request, 1);
1353         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1354         if (!nioptr) {
1355                 CERROR("absent/short niobuf array\n");
1356                 GOTO(out_req, rc = -EPROTO);
1357         }
1358
1359         /* actual write */
1360         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1361                 struct page *page = pga[mapped].pg;
1362                 struct buffer_head *bh;
1363                 kdev_t dev;
1364
1365                 if (swab)
1366                         lustre_swab_niobuf_remote (nioptr);
1367
1368                 /* got san device associated */
1369                 LASSERT(class_conn2obd(conn));
1370                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1371
1372                 if (!page->buffers) {
1373                         create_empty_buffers(page, dev, PAGE_SIZE);
1374                 } else {
1375                         /* checking */
1376                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1377                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1378                         LASSERT(page->buffers->b_blocknr ==
1379                                 (unsigned long)nioptr->offset);
1380                 }
1381                 bh = page->buffers;
1382
1383                 LASSERT(bh);
1384
1385                 /* if buffer locked, wait it's io completion */
1386                 if (test_bit(BH_Lock, &bh->b_state))
1387                         wait_on_buffer(bh);
1388
1389                 clear_bit(BH_New, &bh->b_state);
1390                 set_bit(BH_Mapped, &bh->b_state);
1391
1392                 /* override the block nr */
1393                 bh->b_blocknr = (unsigned long)nioptr->offset;
1394
1395                 /* we are about to write it, so set it
1396                  * uptodate/dirty
1397                  * page lock should garentee no race condition here */
1398                 set_bit(BH_Uptodate, &bh->b_state);
1399                 set_bit(BH_Dirty, &bh->b_state);
1400
1401                 ll_rw_block(WRITE, 1, &bh);
1402
1403                 /* must do syncronous write here */
1404                 wait_on_buffer(bh);
1405                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1406                         /* I/O error */
1407                         rc = -EIO;
1408                         goto out_req;
1409                 }
1410         }
1411
1412 out_req:
1413         ptlrpc_req_finished(request);
1414         RETURN(rc);
1415 }
1416
1417 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1418                       struct lov_stripe_md *lsm, obd_count page_count,
1419                       struct brw_page *pga, struct obd_trans_info *oti)
1420 {
1421         ENTRY;
1422
1423         while (page_count) {
1424                 obd_count pages_per_brw;
1425                 int rc;
1426
1427                 if (page_count > OSC_BRW_MAX_IOV)
1428                         pages_per_brw = OSC_BRW_MAX_IOV;
1429                 else
1430                         pages_per_brw = page_count;
1431
1432                 if (cmd & OBD_BRW_WRITE)
1433                         rc = sanosc_brw_write(conn, lsm, pages_per_brw, pga);
1434                 else
1435                         rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga);
1436
1437                 if (rc != 0)
1438                         RETURN(rc);
1439
1440                 page_count -= pages_per_brw;
1441                 pga += pages_per_brw;
1442         }
1443         RETURN(0);
1444 }
1445 #endif
1446 #endif
1447
1448 static int osc_mark_page_dirty(struct lustre_handle *conn, 
1449                                struct lov_stripe_md *lsm, unsigned long offset)
1450 {
1451         struct client_obd *cli = &class_conn2obd(conn)->u.cli;
1452         struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
1453         int rc;
1454         ENTRY;
1455
1456         down(&cli->cl_dirty_sem);
1457
1458         if (cli->cl_ost_can_grant && 
1459             (cli->cl_dirty + PAGE_CACHE_SIZE >= cli->cl_dirty_granted)) {
1460                 CDEBUG(D_INODE, "granted "LPU64" < "LPU64"\n",
1461                        cli->cl_dirty_granted, cli->cl_dirty + PAGE_CACHE_SIZE);
1462                 GOTO(out, rc = -EDQUOT);
1463         }
1464
1465         rc = ot_mark_offset(dirty_ot, offset);
1466         if (rc)
1467                 GOTO(out, rc);
1468
1469         cli->cl_dirty += PAGE_CACHE_SIZE;
1470         CDEBUG(D_INODE, "dirtied off %lu, now "LPU64" bytes dirty\n",
1471                         offset, cli->cl_dirty);
1472 out:
1473         up(&cli->cl_dirty_sem);
1474         RETURN(rc);
1475 }
1476
1477 static int osc_clear_dirty_pages(struct lustre_handle *conn, 
1478                                  struct lov_stripe_md *lsm,
1479                                  unsigned long start, unsigned long end,
1480                                  unsigned long *cleared)
1481 {
1482         struct client_obd *cli = &class_conn2obd(conn)->u.cli;
1483         struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
1484         unsigned long old_marked, new_marked;
1485         int rc;
1486         ENTRY;
1487
1488         down(&cli->cl_dirty_sem);
1489
1490         old_marked = ot_num_marked(dirty_ot);
1491
1492         rc = ot_clear_extent(dirty_ot, start, end);
1493         if (rc)
1494                 GOTO(out, rc);
1495
1496         new_marked = ot_num_marked(dirty_ot);
1497
1498         LASSERT(new_marked <= old_marked);
1499         LASSERT(old_marked * PAGE_CACHE_SIZE <= cli->cl_dirty);
1500         *cleared = old_marked - new_marked;
1501         cli->cl_dirty -= (__u64)*cleared << PAGE_CACHE_SHIFT;
1502         CDEBUG(D_INODE, "cleared [%lu,%lu], now "LPU64" bytes dirty\n",
1503                         start, end, cli->cl_dirty);
1504
1505 out:
1506         up(&cli->cl_dirty_sem);
1507         RETURN(rc);
1508 }
1509
1510 static int osc_last_dirty_offset(struct lustre_handle *conn,
1511                                  struct lov_stripe_md *lsm,
1512                                  unsigned long *offset)
1513 {
1514         struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
1515         int rc;
1516         ENTRY;
1517
1518         rc = ot_last_marked(dirty_ot, offset);
1519         RETURN(rc);
1520 }
1521
1522 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1523                        struct lustre_handle *parent_lock,
1524                        __u32 type, void *extentp, int extent_len, __u32 mode,
1525                        int *flags, void *callback, void *data,
1526                        struct lustre_handle *lockh)
1527 {
1528         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1529         struct obd_device *obddev = class_conn2obd(connh);
1530         struct ldlm_extent *extent = extentp;
1531         int rc;
1532         ENTRY;
1533
1534         /* Filesystem lock extents are extended to page boundaries so that
1535          * dealing with the page cache is a little smoother.  */
1536         extent->start -= extent->start & ~PAGE_MASK;
1537         extent->end |= ~PAGE_MASK;
1538
1539         /* Next, search for already existing extent locks that will cover us */
1540         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA, &res_id,
1541                              type, extent, sizeof(extent), mode, data, lockh);
1542         if (rc == 1)
1543                 /* We already have a lock, and it's referenced */
1544                 RETURN(ELDLM_OK);
1545
1546         /* If we're trying to read, we also search for an existing PW lock.  The
1547          * VFS and page cache already protect us locally, so lots of readers/
1548          * writers can share a single PW lock.
1549          *
1550          * There are problems with conversion deadlocks, so instead of
1551          * converting a read lock to a write lock, we'll just enqueue a new
1552          * one.
1553          *
1554          * At some point we should cancel the read lock instead of making them
1555          * send us a blocking callback, but there are problems with canceling
1556          * locks out from other users right now, too. */
1557
1558         if (mode == LCK_PR) {
1559                 rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA,
1560                                      &res_id, type, extent, sizeof(extent),
1561                                      LCK_PW, data, lockh);
1562                 if (rc == 1) {
1563                         /* FIXME: This is not incredibly elegant, but it might
1564                          * be more elegant than adding another parameter to
1565                          * lock_match.  I want a second opinion. */
1566                         ldlm_lock_addref(lockh, LCK_PR);
1567                         ldlm_lock_decref(lockh, LCK_PW);
1568
1569                         RETURN(ELDLM_OK);
1570                 }
1571         }
1572
1573         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1574                               res_id, type, extent, sizeof(extent), mode, flags,
1575                               ldlm_completion_ast, callback, data, lockh);
1576         RETURN(rc);
1577 }
1578
1579 static int osc_match(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1580                        __u32 type, void *extentp, int extent_len, __u32 mode,
1581                        int *flags, void *data, struct lustre_handle *lockh)
1582 {
1583         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1584         struct obd_device *obddev = class_conn2obd(connh);
1585         struct ldlm_extent *extent = extentp;
1586         int rc;
1587         ENTRY;
1588
1589         /* Filesystem lock extents are extended to page boundaries so that
1590          * dealing with the page cache is a little smoother */
1591         extent->start -= extent->start & ~PAGE_MASK;
1592         extent->end |= ~PAGE_MASK;
1593
1594         /* Next, search for already existing extent locks that will cover us */
1595         rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id, type,
1596                              extent, sizeof(extent), mode, data, lockh);
1597         if (rc)
1598                 RETURN(rc);
1599
1600         /* If we're trying to read, we also search for an existing PW lock.  The
1601          * VFS and page cache already protect us locally, so lots of readers/
1602          * writers can share a single PW lock. */
1603         if (mode == LCK_PR) {
1604                 rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id,
1605                                      type, extent, sizeof(extent), LCK_PW,
1606                                      data, lockh);
1607                 if (rc == 1) {
1608                         /* FIXME: This is not incredibly elegant, but it might
1609                          * be more elegant than adding another parameter to
1610                          * lock_match.  I want a second opinion. */
1611                         ldlm_lock_addref(lockh, LCK_PR);
1612                         ldlm_lock_decref(lockh, LCK_PW);
1613                 }
1614         }
1615         RETURN(rc);
1616 }
1617
1618 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1619                       __u32 mode, struct lustre_handle *lockh)
1620 {
1621         ENTRY;
1622
1623         ldlm_lock_decref(lockh, mode);
1624
1625         RETURN(0);
1626 }
1627
1628 static int osc_cancel_unused(struct lustre_handle *connh,
1629                              struct lov_stripe_md *lsm, int flags, void *opaque)
1630 {
1631         struct obd_device *obddev = class_conn2obd(connh);
1632         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1633
1634         return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
1635                                       opaque);
1636 }
1637
1638 static int osc_statfs(struct obd_export *exp, struct obd_statfs *osfs)
1639 {
1640         struct obd_statfs *msfs;
1641         struct ptlrpc_request *request;
1642         int rc, size = sizeof(*osfs);
1643         ENTRY;
1644
1645         request = ptlrpc_prep_req(exp->exp_obd->u.cli.cl_import, OST_STATFS, 0, 
1646                                   NULL, NULL);
1647         if (!request)
1648                 RETURN(-ENOMEM);
1649
1650         request->rq_replen = lustre_msg_size(1, &size);
1651
1652         rc = ptlrpc_queue_wait(request);
1653         if (rc) {
1654                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1655                 GOTO(out, rc);
1656         }
1657
1658         msfs = lustre_swab_repbuf (request, 0, sizeof (*msfs),
1659                                    lustre_swab_obd_statfs);
1660         if (msfs == NULL) {
1661                 CERROR ("Can't unpack obd_statfs\n");
1662                 GOTO (out, rc = -EPROTO);
1663         }
1664
1665         memcpy (osfs, msfs, sizeof (*msfs));
1666
1667         EXIT;
1668  out:
1669         ptlrpc_req_finished(request);
1670         return rc;
1671 }
1672
1673 /* Retrieve object striping information.
1674  *
1675  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1676  * the maximum number of OST indices which will fit in the user buffer.
1677  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1678  */
1679 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1680                          struct lov_mds_md *lmmu)
1681 {
1682         struct lov_mds_md lmm, *lmmk;
1683         int rc, lmm_size;
1684         ENTRY;
1685
1686         if (!lsm)
1687                 RETURN(-ENODATA);
1688
1689         rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1690         if (rc)
1691                 RETURN(-EFAULT);
1692
1693         if (lmm.lmm_magic != LOV_MAGIC)
1694                 RETURN(-EINVAL);
1695
1696         if (lmm.lmm_ost_count < 1)
1697                 RETURN(-EOVERFLOW);
1698
1699         lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1700         OBD_ALLOC(lmmk, lmm_size);
1701         if (rc < 0)
1702                 RETURN(rc);
1703
1704         lmmk->lmm_stripe_count = 1;
1705         lmmk->lmm_ost_count = 1;
1706         lmmk->lmm_object_id = lsm->lsm_object_id;
1707         lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1708
1709         if (copy_to_user(lmmu, lmmk, lmm_size))
1710                 rc = -EFAULT;
1711
1712         OBD_FREE(lmmk, lmm_size);
1713
1714         RETURN(rc);
1715 }
1716
1717 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1718                          void *karg, void *uarg)
1719 {
1720         struct obd_device *obddev = class_conn2obd(conn);
1721         struct obd_ioctl_data *data = karg;
1722         int err = 0;
1723         ENTRY;
1724
1725         switch (cmd) {
1726         case IOC_OSC_REGISTER_LOV: {
1727                 if (obddev->u.cli.cl_containing_lov)
1728                         GOTO(out, err = -EALREADY);
1729                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1730                 GOTO(out, err);
1731         }
1732         case OBD_IOC_LOV_GET_CONFIG: {
1733                 char *buf;
1734                 struct lov_desc *desc;
1735                 struct obd_uuid uuid;
1736
1737                 buf = NULL;
1738                 len = 0;
1739                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1740                         GOTO(out, err = -EINVAL);
1741
1742                 data = (struct obd_ioctl_data *)buf;
1743
1744                 if (sizeof(*desc) > data->ioc_inllen1) {
1745                         OBD_FREE(buf, len);
1746                         GOTO(out, err = -EINVAL);
1747                 }
1748
1749                 if (data->ioc_inllen2 < sizeof(uuid)) {
1750                         OBD_FREE(buf, len);
1751                         GOTO(out, err = -EINVAL);
1752                 }
1753
1754                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1755                 desc->ld_tgt_count = 1;
1756                 desc->ld_active_tgt_count = 1;
1757                 desc->ld_default_stripe_count = 1;
1758                 desc->ld_default_stripe_size = 0;
1759                 desc->ld_default_stripe_offset = 0;
1760                 desc->ld_pattern = 0;
1761                 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1762
1763                 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1764
1765                 err = copy_to_user((void *)uarg, buf, len);
1766                 if (err)
1767                         err = -EFAULT;
1768                 obd_ioctl_freedata(buf, len);
1769                 GOTO(out, err);
1770         }
1771         case LL_IOC_LOV_SETSTRIPE:
1772                 err = obd_alloc_memmd(conn, karg);
1773                 if (err > 0)
1774                         err = 0;
1775                 GOTO(out, err);
1776         case LL_IOC_LOV_GETSTRIPE:
1777                 err = osc_getstripe(conn, karg, uarg);
1778                 GOTO(out, err);
1779         case OBD_IOC_CLIENT_RECOVER:
1780                 err = ptlrpc_recover_import(obddev->u.cli.cl_import,
1781                                             data->ioc_inlbuf1);
1782                 GOTO(out, err);
1783         case IOC_OSC_SET_ACTIVE:
1784                 err = ptlrpc_set_import_active(obddev->u.cli.cl_import,
1785                                                data->ioc_offset);
1786                 GOTO(out, err);
1787         default:
1788                 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1789                 GOTO(out, err = -ENOTTY);
1790         }
1791 out:
1792         return err;
1793 }
1794
1795 static int osc_get_info(struct lustre_handle *conn, obd_count keylen,
1796                         void *key, __u32 *vallen, void *val)
1797 {
1798         ENTRY;
1799         if (!vallen || !val)
1800                 RETURN(-EFAULT);
1801
1802         if (keylen > strlen("lock_to_stripe") &&
1803             strcmp(key, "lock_to_stripe") == 0) {
1804                 __u32 *stripe = val;
1805                 *vallen = sizeof(*stripe);
1806                 *stripe = 0;
1807                 RETURN(0);
1808         }
1809         RETURN(-EINVAL);
1810 }
1811
1812 struct obd_ops osc_obd_ops = {
1813         o_owner:        THIS_MODULE,
1814         o_attach:       osc_attach,
1815         o_detach:       osc_detach,
1816         o_setup:        client_obd_setup,
1817         o_cleanup:      client_obd_cleanup,
1818         o_connect:      client_import_connect,
1819         o_disconnect:   client_import_disconnect,
1820         o_statfs:       osc_statfs,
1821         o_packmd:       osc_packmd,
1822         o_unpackmd:     osc_unpackmd,
1823         o_create:       osc_create,
1824         o_destroy:      osc_destroy,
1825         o_getattr:      osc_getattr,
1826         o_getattr_async: osc_getattr_async,
1827         o_setattr:      osc_setattr,
1828         o_open:         osc_open,
1829         o_close:        osc_close,
1830         o_brw:          osc_brw,
1831         o_brw_async:    osc_brw_async,
1832         o_punch:        osc_punch,
1833         o_enqueue:      osc_enqueue,
1834         o_match:        osc_match,
1835         o_cancel:       osc_cancel,
1836         o_cancel_unused: osc_cancel_unused,
1837         o_iocontrol:    osc_iocontrol,
1838         o_get_info:     osc_get_info,
1839         .o_mark_page_dirty =    osc_mark_page_dirty,
1840         .o_clear_dirty_pages =  osc_clear_dirty_pages,
1841         .o_last_dirty_offset =  osc_last_dirty_offset,
1842 };
1843
1844 struct obd_ops sanosc_obd_ops = {
1845         o_owner:        THIS_MODULE,
1846         o_attach:       osc_attach,
1847         o_detach:       osc_detach,
1848         o_cleanup:      client_obd_cleanup,
1849         o_connect:      client_import_connect,
1850         o_disconnect:   client_import_disconnect,
1851         o_statfs:       osc_statfs,
1852         o_packmd:       osc_packmd,
1853         o_unpackmd:     osc_unpackmd,
1854         o_create:       osc_create,
1855         o_destroy:      osc_destroy,
1856         o_getattr:      osc_getattr,
1857         o_getattr_async: osc_getattr_async,
1858         o_setattr:      osc_setattr,
1859         o_open:         osc_open,
1860         o_close:        osc_close,
1861 #ifdef __KERNEL__
1862         o_setup:        client_sanobd_setup,
1863         o_brw:          sanosc_brw,
1864 #endif
1865         o_punch:        osc_punch,
1866         o_enqueue:      osc_enqueue,
1867         o_match:        osc_match,
1868         o_cancel:       osc_cancel,
1869         o_cancel_unused: osc_cancel_unused,
1870         o_iocontrol:    osc_iocontrol,
1871         .o_mark_page_dirty =    osc_mark_page_dirty,
1872         .o_clear_dirty_pages =  osc_clear_dirty_pages,
1873         .o_last_dirty_offset =  osc_last_dirty_offset,
1874 };
1875
1876 int __init osc_init(void)
1877 {
1878         struct lprocfs_static_vars lvars;
1879         int rc;
1880         ENTRY;
1881
1882         LASSERT(sizeof(struct obd_client_handle) <= FD_OSTDATA_SIZE);
1883         LASSERT(sizeof(struct obd_client_handle) <= OBD_INLINESZ);
1884
1885         lprocfs_init_vars(&lvars);
1886
1887         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1888                                  LUSTRE_OSC_NAME);
1889         if (rc)
1890                 RETURN(rc);
1891
1892         rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1893                                  LUSTRE_SANOSC_NAME);
1894         if (rc)
1895                 class_unregister_type(LUSTRE_OSC_NAME);
1896
1897         RETURN(rc);
1898 }
1899
1900 static void __exit osc_exit(void)
1901 {
1902         class_unregister_type(LUSTRE_SANOSC_NAME);
1903         class_unregister_type(LUSTRE_OSC_NAME);
1904 }
1905
1906 #ifdef __KERNEL__
1907 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1908 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1909 MODULE_LICENSE("GPL");
1910
1911 module_init(osc_init);
1912 module_exit(osc_exit);
1913 #endif