Whamcloud - gitweb
- update from b1_4_mountconf
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include "osc_internal.h"
59
60 static quota_interface_t *quota_interface = NULL;
61 extern quota_interface_t osc_quota_interface;
62
63 /* Pack OSC object metadata for disk storage (LE byte order). */
64 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
65                       struct lov_stripe_md *lsm)
66 {
67         int lmm_size;
68         ENTRY;
69
70         lmm_size = sizeof(**lmmp);
71         if (!lmmp)
72                 RETURN(lmm_size);
73
74         if (*lmmp && !lsm) {
75                 OBD_FREE(*lmmp, lmm_size);
76                 *lmmp = NULL;
77                 RETURN(0);
78         }
79
80         if (!*lmmp) {
81                 OBD_ALLOC(*lmmp, lmm_size);
82                 if (!*lmmp)
83                         RETURN(-ENOMEM);
84         }
85
86         if (lsm) {
87                 LASSERT(lsm->lsm_object_id);
88                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
89         }
90
91         RETURN(lmm_size);
92 }
93
94 /* Unpack OSC object metadata from disk storage (LE byte order). */
95 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
96                         struct lov_mds_md *lmm, int lmm_bytes)
97 {
98         int lsm_size;
99         ENTRY;
100
101         if (lmm != NULL) {
102                 if (lmm_bytes < sizeof (*lmm)) {
103                         CERROR("lov_mds_md too small: %d, need %d\n",
104                                lmm_bytes, (int)sizeof(*lmm));
105                         RETURN(-EINVAL);
106                 }
107                 /* XXX LOV_MAGIC etc check? */
108
109                 if (lmm->lmm_object_id == 0) {
110                         CERROR("lov_mds_md: zero lmm_object_id\n");
111                         RETURN(-EINVAL);
112                 }
113         }
114
115         lsm_size = lov_stripe_md_size(1);
116         if (lsmp == NULL)
117                 RETURN(lsm_size);
118
119         if (*lsmp != NULL && lmm == NULL) {
120                 OBD_FREE(*lsmp, lsm_size);
121                 *lsmp = NULL;
122                 RETURN(0);
123         }
124
125         if (*lsmp == NULL) {
126                 OBD_ALLOC(*lsmp, lsm_size);
127                 if (*lsmp == NULL)
128                         RETURN(-ENOMEM);
129                 loi_init((*lsmp)->lsm_oinfo);
130         }
131
132         if (lmm != NULL) {
133                 /* XXX zero *lsmp? */
134                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
135                 LASSERT((*lsmp)->lsm_object_id);
136         }
137
138         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
139
140         RETURN(lsm_size);
141 }
142
143 static int osc_getattr_interpret(struct ptlrpc_request *req,
144                                  struct osc_getattr_async_args *aa, int rc)
145 {
146         struct ost_body *body;
147         ENTRY;
148
149         if (rc != 0)
150                 RETURN(rc);
151
152         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
153         if (body) {
154                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
155                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
156
157                 /* This should really be sent by the OST */
158                 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
159                 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
160         } else {
161                 CERROR("can't unpack ost_body\n");
162                 rc = -EPROTO;
163                 aa->aa_oa->o_valid = 0;
164         }
165
166         RETURN(rc);
167 }
168
169 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
170                              struct lov_stripe_md *md,
171                              struct ptlrpc_request_set *set)
172 {
173         struct ptlrpc_request *request;
174         struct ost_body *body;
175         int size = sizeof(*body);
176         struct osc_getattr_async_args *aa;
177         ENTRY;
178
179         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
180                                   OST_GETATTR, 1, &size, NULL);
181         if (!request)
182                 RETURN(-ENOMEM);
183
184         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
185         memcpy(&body->oa, oa, sizeof(*oa));
186
187         request->rq_replen = lustre_msg_size(1, &size);
188         request->rq_interpret_reply = osc_getattr_interpret;
189
190         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
191         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
192         aa->aa_oa = oa;
193
194         ptlrpc_set_add_req (set, request);
195         RETURN (0);
196 }
197
198 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
199                        struct lov_stripe_md *md)
200 {
201         struct ptlrpc_request *request;
202         struct ost_body *body;
203         int rc, size = sizeof(*body);
204         ENTRY;
205
206         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
207                                   OST_GETATTR, 1, &size, NULL);
208         if (!request)
209                 RETURN(-ENOMEM);
210
211         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
212         memcpy(&body->oa, oa, sizeof(*oa));
213
214         request->rq_replen = lustre_msg_size(1, &size);
215
216         rc = ptlrpc_queue_wait(request);
217         if (rc) {
218                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
219                 GOTO(out, rc);
220         }
221
222         body = lustre_swab_repbuf(request, 0, sizeof (*body),
223                                   lustre_swab_ost_body);
224         if (body == NULL) {
225                 CERROR ("can't unpack ost_body\n");
226                 GOTO (out, rc = -EPROTO);
227         }
228
229         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
230         memcpy(oa, &body->oa, sizeof(*oa));
231
232         /* This should really be sent by the OST */
233         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
234         oa->o_valid |= OBD_MD_FLBLKSZ;
235
236         EXIT;
237  out:
238         ptlrpc_req_finished(request);
239         return rc;
240 }
241
242 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
243                        struct lov_stripe_md *md, struct obd_trans_info *oti)
244 {
245         struct ptlrpc_request *request;
246         struct ost_body *body;
247         int rc, size = sizeof(*body);
248         ENTRY;
249
250         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
251                                   OST_SETATTR, 1, &size, NULL);
252         if (!request)
253                 RETURN(-ENOMEM);
254
255         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
256         memcpy(&body->oa, oa, sizeof(*oa));
257
258         request->rq_replen = lustre_msg_size(1, &size);
259
260         rc = ptlrpc_queue_wait(request);
261         if (rc)
262                 GOTO(out, rc);
263
264         body = lustre_swab_repbuf(request, 0, sizeof(*body),
265                                   lustre_swab_ost_body);
266         if (body == NULL)
267                 GOTO(out, rc = -EPROTO);
268
269         memcpy(oa, &body->oa, sizeof(*oa));
270
271         EXIT;
272 out:
273         ptlrpc_req_finished(request);
274         RETURN(0);
275 }
276
277 static int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
278                              struct lov_stripe_md *md,
279                              struct obd_trans_info *oti)
280 {
281         struct ptlrpc_request *request;
282         struct ost_body *body;
283         int rc = 0, size = sizeof(*body);
284         ENTRY;
285
286         LASSERT(oti);
287
288         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
289                                   OST_SETATTR, 1, &size, NULL);
290         if (!request)
291                 RETURN(-ENOMEM);
292
293         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
294
295         if (oa->o_valid & OBD_MD_FLCOOKIE)
296                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
297                        sizeof(*oti->oti_logcookies));
298
299         memcpy(&body->oa, oa, sizeof(*oa));
300         request->rq_replen = lustre_msg_size(1, &size);
301         /* do mds to ost setattr asynchronouly */
302         ptlrpcd_add_req(request);
303
304         RETURN(rc);
305 }
306
307 int osc_real_create(struct obd_export *exp, struct obdo *oa,
308                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *request;
311         struct ost_body *body;
312         struct lov_stripe_md *lsm;
313         int rc, size = sizeof(*body);
314         ENTRY;
315
316         LASSERT(oa);
317         LASSERT(ea);
318
319         lsm = *ea;
320         if (!lsm) {
321                 rc = obd_alloc_memmd(exp, &lsm);
322                 if (rc < 0)
323                         RETURN(rc);
324         }
325
326         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
327                                   OST_CREATE, 1, &size, NULL);
328         if (!request)
329                 GOTO(out, rc = -ENOMEM);
330
331         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
332         memcpy(&body->oa, oa, sizeof(body->oa));
333
334         request->rq_replen = lustre_msg_size(1, &size);
335         if (oa->o_valid & OBD_MD_FLINLINE) {
336                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
337                         oa->o_flags == OBD_FL_DELORPHAN);
338                 DEBUG_REQ(D_HA, request,
339                           "delorphan from OST integration");
340                 /* Don't resend the delorphan request */
341                 request->rq_no_resend = request->rq_no_delay = 1;
342         }
343
344         rc = ptlrpc_queue_wait(request);
345         if (rc)
346                 GOTO(out_req, rc);
347
348         body = lustre_swab_repbuf(request, 0, sizeof(*body),
349                                   lustre_swab_ost_body);
350         if (body == NULL) {
351                 CERROR ("can't unpack ost_body\n");
352                 GOTO (out_req, rc = -EPROTO);
353         }
354
355         memcpy(oa, &body->oa, sizeof(*oa));
356
357         /* This should really be sent by the OST */
358         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
359         oa->o_valid |= OBD_MD_FLBLKSZ;
360
361         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
362          * have valid lsm_oinfo data structs, so don't go touching that.
363          * This needs to be fixed in a big way.
364          */
365         lsm->lsm_object_id = oa->o_id;
366         *ea = lsm;
367
368         if (oti != NULL) {
369                 oti->oti_transno = request->rq_repmsg->transno;
370
371                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
372                         if (!oti->oti_logcookies)
373                                 oti_alloc_cookies(oti, 1);
374                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
375                                sizeof(oti->oti_onecookie));
376                 }
377         }
378
379         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
380         EXIT;
381 out_req:
382         ptlrpc_req_finished(request);
383 out:
384         if (rc && !*ea)
385                 obd_free_memmd(exp, &lsm);
386         return rc;
387 }
388
389 static int osc_punch(struct obd_export *exp, struct obdo *oa,
390                      struct lov_stripe_md *md, obd_size start,
391                      obd_size end, struct obd_trans_info *oti)
392 {
393         struct ptlrpc_request *request;
394         struct ost_body *body;
395         int rc, size = sizeof(*body);
396         ENTRY;
397
398         if (!oa) {
399                 CERROR("oa NULL\n");
400                 RETURN(-EINVAL);
401         }
402
403         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
404                                   OST_PUNCH, 1, &size, NULL);
405         if (!request)
406                 RETURN(-ENOMEM);
407
408         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
409         memcpy(&body->oa, oa, sizeof(*oa));
410
411         /* overload the size and blocks fields in the oa with start/end */
412         body->oa.o_size = start;
413         body->oa.o_blocks = end;
414         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
415
416         request->rq_replen = lustre_msg_size(1, &size);
417
418         rc = ptlrpc_queue_wait(request);
419         if (rc)
420                 GOTO(out, rc);
421
422         body = lustre_swab_repbuf (request, 0, sizeof (*body),
423                                    lustre_swab_ost_body);
424         if (body == NULL) {
425                 CERROR ("can't unpack ost_body\n");
426                 GOTO (out, rc = -EPROTO);
427         }
428
429         memcpy(oa, &body->oa, sizeof(*oa));
430
431         EXIT;
432  out:
433         ptlrpc_req_finished(request);
434         return rc;
435 }
436
437 static int osc_sync(struct obd_export *exp, struct obdo *oa,
438                     struct lov_stripe_md *md, obd_size start, obd_size end)
439 {
440         struct ptlrpc_request *request;
441         struct ost_body *body;
442         int rc, size = sizeof(*body);
443         ENTRY;
444
445         if (!oa) {
446                 CERROR("oa NULL\n");
447                 RETURN(-EINVAL);
448         }
449
450         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
451                                   OST_SYNC, 1, &size, NULL);
452         if (!request)
453                 RETURN(-ENOMEM);
454
455         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
456         memcpy(&body->oa, oa, sizeof(*oa));
457
458         /* overload the size and blocks fields in the oa with start/end */
459         body->oa.o_size = start;
460         body->oa.o_blocks = end;
461         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
462
463         request->rq_replen = lustre_msg_size(1, &size);
464
465         rc = ptlrpc_queue_wait(request);
466         if (rc)
467                 GOTO(out, rc);
468
469         body = lustre_swab_repbuf(request, 0, sizeof(*body),
470                                   lustre_swab_ost_body);
471         if (body == NULL) {
472                 CERROR ("can't unpack ost_body\n");
473                 GOTO (out, rc = -EPROTO);
474         }
475
476         memcpy(oa, &body->oa, sizeof(*oa));
477
478         EXIT;
479  out:
480         ptlrpc_req_finished(request);
481         return rc;
482 }
483
484 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
485                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
486                        struct obd_export *md_export)
487 {
488         struct ptlrpc_request *request;
489         struct ost_body *body;
490         int rc, size = sizeof(*body);
491         ENTRY;
492
493         if (!oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
499                                   OST_DESTROY, 1, &size, NULL);
500         if (!request)
501                 RETURN(-ENOMEM);
502
503         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
504
505         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
506                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
507                        sizeof(*oti->oti_logcookies));
508                 oti->oti_logcookies++;
509         }
510
511         memcpy(&body->oa, oa, sizeof(*oa));
512         request->rq_replen = lustre_msg_size(1, &size);
513
514         rc = ptlrpc_queue_wait(request);
515         if (rc == -ENOENT)
516                 rc = 0;
517         if (rc)
518                 GOTO(out, rc);
519
520         body = lustre_swab_repbuf(request, 0, sizeof(*body),
521                                   lustre_swab_ost_body);
522         if (body == NULL) {
523                 CERROR ("Can't unpack body\n");
524                 GOTO (out, rc = -EPROTO);
525         }
526
527         memcpy(oa, &body->oa, sizeof(*oa));
528
529         EXIT;
530  out:
531         ptlrpc_req_finished(request);
532         return rc;
533 }
534
535 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
536                                 long writing_bytes)
537 {
538         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
539
540         LASSERT(!(oa->o_valid & bits));
541
542         oa->o_valid |= bits;
543         client_obd_list_lock(&cli->cl_loi_list_lock);
544         oa->o_dirty = cli->cl_dirty;
545         if (cli->cl_dirty > cli->cl_dirty_max) {
546                 CERROR("dirty %lu > dirty_max %lu\n",
547                        cli->cl_dirty, cli->cl_dirty_max);
548                 oa->o_undirty = 0;
549         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
550                 CERROR("dirty %lu - dirty_max %lu too big???\n",
551                        cli->cl_dirty, cli->cl_dirty_max);
552                 oa->o_undirty = 0;
553         } else {
554                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
555                                 (cli->cl_max_rpcs_in_flight + 1);
556                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
557         }
558         oa->o_grant = cli->cl_avail_grant;
559         oa->o_dropped = cli->cl_lost_grant;
560         cli->cl_lost_grant = 0;
561         client_obd_list_unlock(&cli->cl_loi_list_lock);
562         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
563                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
564 }
565
566 /* caller must hold loi_list_lock */
567 static void osc_consume_write_grant(struct client_obd *cli,
568                                     struct osc_async_page *oap)
569 {
570         cli->cl_dirty += CFS_PAGE_SIZE;
571         cli->cl_avail_grant -= CFS_PAGE_SIZE;
572         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
573         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", CFS_PAGE_SIZE, oap);
574         LASSERT(cli->cl_avail_grant >= 0);
575 }
576
577 static unsigned long rpcs_in_flight(struct client_obd *cli)
578 {
579         return cli->cl_r_in_flight + cli->cl_w_in_flight;
580 }
581
582 /* caller must hold loi_list_lock */
583 void osc_wake_cache_waiters(struct client_obd *cli)
584 {
585         struct list_head *l, *tmp;
586         struct osc_cache_waiter *ocw;
587
588         ENTRY;
589         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
590                 /* if we can't dirty more, we must wait until some is written */
591                 if (cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) {
592                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
593                                cli->cl_dirty, cli->cl_dirty_max);
594                         return;
595                 }
596
597                 /* if still dirty cache but no grant wait for pending RPCs that
598                  * may yet return us some grant before doing sync writes */
599                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
600                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
601                                cli->cl_w_in_flight);
602                         return;
603                 }
604
605                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
606                 list_del_init(&ocw->ocw_entry);
607                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
608                         /* no more RPCs in flight to return grant, do sync IO */
609                         ocw->ocw_rc = -EDQUOT;
610                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
611                 } else {
612                         osc_consume_write_grant(cli, ocw->ocw_oap);
613                 }
614
615                 cfs_waitq_signal(&ocw->ocw_waitq);
616         }
617
618         EXIT;
619 }
620
621 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
622 {
623         client_obd_list_lock(&cli->cl_loi_list_lock);
624         cli->cl_avail_grant = ocd->ocd_grant;
625         client_obd_list_unlock(&cli->cl_loi_list_lock);
626
627         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
628                cli->cl_avail_grant, cli->cl_lost_grant);
629         LASSERT(cli->cl_avail_grant >= 0);
630 }
631
632 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
633 {
634         client_obd_list_lock(&cli->cl_loi_list_lock);
635         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
636         cli->cl_avail_grant += body->oa.o_grant;
637         /* waiters are woken in brw_interpret_oap */
638         client_obd_list_unlock(&cli->cl_loi_list_lock);
639 }
640
641 /* We assume that the reason this OSC got a short read is because it read
642  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
643  * via the LOV, and it _knows_ it's reading inside the file, it's just that
644  * this stripe never got written at or beyond this stripe offset yet. */
645 static void handle_short_read(int nob_read, obd_count page_count,
646                               struct brw_page *pga)
647 {
648         char *ptr;
649
650         /* skip bytes read OK */
651         while (nob_read > 0) {
652                 LASSERT (page_count > 0);
653
654                 if (pga->count > nob_read) {
655                         /* EOF inside this page */
656                         ptr = cfs_kmap(pga->pg) + (pga->off & ~CFS_PAGE_MASK);
657                         memset(ptr + nob_read, 0, pga->count - nob_read);
658                         cfs_kunmap(pga->pg);
659                         page_count--;
660                         pga++;
661                         break;
662                 }
663
664                 nob_read -= pga->count;
665                 page_count--;
666                 pga++;
667         }
668
669         /* zero remaining pages */
670         while (page_count-- > 0) {
671                 ptr = cfs_kmap(pga->pg) + (pga->off & ~CFS_PAGE_MASK);
672                 memset(ptr, 0, pga->count);
673                 cfs_kunmap(pga->pg);
674                 pga++;
675         }
676 }
677
678 static int check_write_rcs(struct ptlrpc_request *request,
679                            int requested_nob, int niocount,
680                            obd_count page_count, struct brw_page *pga)
681 {
682         int    *remote_rcs, i;
683
684         /* return error if any niobuf was in error */
685         remote_rcs = lustre_swab_repbuf(request, 1,
686                                         sizeof(*remote_rcs) * niocount, NULL);
687         if (remote_rcs == NULL) {
688                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
689                 return(-EPROTO);
690         }
691         if (lustre_msg_swabbed(request->rq_repmsg))
692                 for (i = 0; i < niocount; i++)
693                         __swab32s(&remote_rcs[i]);
694
695         for (i = 0; i < niocount; i++) {
696                 if (remote_rcs[i] < 0)
697                         return(remote_rcs[i]);
698
699                 if (remote_rcs[i] != 0) {
700                         CERROR("rc[%d] invalid (%d) req %p\n",
701                                 i, remote_rcs[i], request);
702                         return(-EPROTO);
703                 }
704         }
705
706         if (request->rq_bulk->bd_nob_transferred != requested_nob) {
707                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
708                        requested_nob, request->rq_bulk->bd_nob_transferred);
709                 return(-EPROTO);
710         }
711
712         return (0);
713 }
714
715 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
716 {
717         if (p1->flag != p2->flag) {
718                 unsigned mask = ~OBD_BRW_FROM_GRANT;
719
720                 /* warn if we try to combine flags that we don't know to be
721                  * safe to combine */
722                 if ((p1->flag & mask) != (p2->flag & mask))
723                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
724                                "same brw?\n", p1->flag, p2->flag);
725                 return 0;
726         }
727
728         return (p1->off + p1->count == p2->off);
729 }
730
731 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
732                                    struct brw_page *pga)
733 {
734         __u32 cksum = ~0;
735
736         LASSERT (pg_count > 0);
737         while (nob > 0 && pg_count > 0) {
738                 char *ptr = cfs_kmap(pga->pg);
739                 int off = pga->off & ~CFS_PAGE_MASK;
740                 int count = pga->count > nob ? nob : pga->count;
741
742                 cksum = crc32_le(cksum, ptr + off, count);
743                 cfs_kunmap(pga->pg);
744                 LL_CDEBUG_PAGE(D_PAGE, pga->pg, "off %d checksum %x\n",
745                                off, cksum);
746
747                 nob -= pga->count;
748                 pg_count--;
749                 pga++;
750         }
751
752         return cksum;
753 }
754
755 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
756                                 struct lov_stripe_md *lsm, obd_count page_count,
757                                 struct brw_page *pga, int *requested_nobp,
758                                 int *niocountp, struct ptlrpc_request **reqp)
759 {
760         struct ptlrpc_request   *req;
761         struct ptlrpc_bulk_desc *desc;
762         struct client_obd       *cli = &imp->imp_obd->u.cli;
763         struct ost_body         *body;
764         struct obd_ioobj        *ioobj;
765         struct niobuf_remote    *niobuf;
766         int                      niocount;
767         int                      size[3];
768         int                      i;
769         int                      requested_nob;
770         int                      opc;
771         int                      rc;
772         struct ptlrpc_request_pool *pool;
773
774         ENTRY;
775         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
776         pool = ((cmd & OBD_BRW_WRITE) != 0) ? imp->imp_rq_pool : NULL;
777
778         for (niocount = i = 1; i < page_count; i++)
779                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
780                         niocount++;
781
782         size[0] = sizeof(*body);
783         size[1] = sizeof(*ioobj);
784         size[2] = niocount * sizeof(*niobuf);
785
786         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
787         req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 3,
788                                    size, NULL, pool);
789         if (req == NULL)
790                 RETURN (-ENOMEM);
791
792         /* FIXME bug 249. Also see bug 7198 */
793         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
794                 req->rq_request_portal = OST_IO_PORTAL;
795
796         if (opc == OST_WRITE)
797                 desc = ptlrpc_prep_bulk_imp (req, page_count,
798                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
799         else
800                 desc = ptlrpc_prep_bulk_imp (req, page_count,
801                                              BULK_PUT_SINK, OST_BULK_PORTAL);
802         if (desc == NULL)
803                 GOTO(out, rc = -ENOMEM);
804         /* NB request now owns desc and will free it when it gets freed */
805
806         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
807         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
808         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
809
810         memcpy(&body->oa, oa, sizeof(*oa));
811
812         obdo_to_ioobj(oa, ioobj);
813         ioobj->ioo_bufcnt = niocount;
814
815         LASSERT (page_count > 0);
816         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
817                 struct brw_page *pg = &pga[i];
818                 struct brw_page *pg_prev = pg - 1;
819
820                 LASSERT(pg->count > 0);
821                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
822                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
823                          pg->off, pg->count);
824 #ifdef __LINUX__
825                 LASSERTF(i == 0 || pg->off > pg_prev->off,
826                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
827                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
828                          i, page_count,
829                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
830                          pg_prev->pg, page_private(pg_prev->pg),
831                          pg_prev->pg->index, pg_prev->off);
832 #else
833                 LASSERTF(i == 0 || pg->off > pg_prev->off,
834                          "i %d p_c %u\n", i, page_count);
835 #endif
836                 LASSERT((pga[0].flag & OBD_BRW_SRVLOCK) ==
837                         (pg->flag & OBD_BRW_SRVLOCK));
838
839                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
840                                       pg->count);
841                 requested_nob += pg->count;
842
843                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
844                         niobuf--;
845                         niobuf->len += pg->count;
846                 } else {
847                         niobuf->offset = pg->off;
848                         niobuf->len    = pg->count;
849                         niobuf->flags  = pg->flag;
850                 }
851         }
852
853         LASSERT((void *)(niobuf - niocount) ==
854                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
855         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
856
857         /* size[0] still sizeof (*body) */
858         if (opc == OST_WRITE) {
859                 if (unlikely(cli->cl_checksum)) {
860                         body->oa.o_valid |= OBD_MD_FLCKSUM;
861                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
862                                                              page_count, pga);
863                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
864                                body->oa.o_cksum);
865                         /* save this in 'oa', too, for later checking */
866                         oa->o_valid |= OBD_MD_FLCKSUM;
867                         oa->o_cksum = body->oa.o_cksum;
868                 }
869                 /* 1 RC per niobuf */
870                 size[1] = sizeof(__u32) * niocount;
871                 req->rq_replen = lustre_msg_size(2, size);
872         } else {
873                 if (unlikely(cli->cl_checksum))
874                         body->oa.o_valid |= OBD_MD_FLCKSUM;
875                 /* 1 RC for the whole I/O */
876                 req->rq_replen = lustre_msg_size(1, size);
877         }
878
879         *niocountp = niocount;
880         *requested_nobp = requested_nob;
881         *reqp = req;
882         RETURN (0);
883
884  out:
885         ptlrpc_req_finished (req);
886         RETURN (rc);
887 }
888
889 static void check_write_csum(__u32 cli, __u32 srv, int requested_nob,
890                              obd_count page_count, struct brw_page *pga)
891 {
892         __u32 new_csum;
893
894         if (srv == cli) {
895                 CDEBUG(D_PAGE, "checksum %x confirmed\n", cli);
896                 return;
897         }
898
899         new_csum = osc_checksum_bulk(requested_nob, page_count, pga);
900
901         if (new_csum == srv) {
902                 CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client"
903                        "after we checksummed them (original client csum:"
904                        " %x; server csum: %x; client csum now: %x)\n",
905                        cli, srv, new_csum);
906                 return;
907         }
908
909         if (new_csum == cli) {
910                 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit "
911                        "(original client csum: %x; server csum: %x; client "
912                        "csum now: %x)\n", cli, srv, new_csum);
913                 return;
914         }
915
916         CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the "
917                "current page contents don't match the originals OR what the "
918                "server received (original client csum: %x; server csum: %x; "
919                "client csum now: %x)\n", cli, srv, new_csum);
920 }
921
922 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
923                                 int requested_nob, int niocount,
924                                 obd_count page_count, struct brw_page *pga,
925                                 int rc)
926 {
927         const lnet_process_id_t *peer =
928                         &req->rq_import->imp_connection->c_peer;
929         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
930         struct ost_body *body;
931         __u32 client_cksum = 0;
932         ENTRY;
933
934         if (rc < 0 && rc != -EDQUOT)
935                 RETURN(rc);
936
937         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
938         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
939         if (body == NULL) {
940                 CERROR ("Can't unpack body\n");
941                 RETURN(-EPROTO);
942         }
943
944         /* set/clear over quota flag for a uid/gid */
945         if (req->rq_reqmsg->opc == OST_WRITE &&
946             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
947                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
948                              body->oa.o_gid, body->oa.o_valid,
949                              body->oa.o_flags);
950
951         if (rc < 0)
952                 RETURN(rc);
953
954         if (unlikely(oa->o_valid & OBD_MD_FLCKSUM))
955                 client_cksum = oa->o_cksum; /* save for later */
956
957         osc_update_grant(cli, body);
958         memcpy(oa, &body->oa, sizeof(*oa));
959
960         if (req->rq_reqmsg->opc == OST_WRITE) {
961                 if (rc > 0) {
962                         CERROR ("Unexpected +ve rc %d\n", rc);
963                         RETURN(-EPROTO);
964                 }
965                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
966
967                 if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) &&
968                              client_cksum)) {
969                         check_write_csum(client_cksum, oa->o_cksum,
970                                          requested_nob, page_count, pga);
971                 }
972
973                 RETURN(check_write_rcs(req, requested_nob, niocount,
974                                        page_count, pga));
975         }
976
977         /* The rest of this function executes only for OST_READs */
978         if (rc > requested_nob) {
979                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
980                 RETURN(-EPROTO);
981         }
982
983         if (rc != req->rq_bulk->bd_nob_transferred) {
984                 CERROR ("Unexpected rc %d (%d transferred)\n",
985                         rc, req->rq_bulk->bd_nob_transferred);
986                 return (-EPROTO);
987         }
988
989         if (rc < requested_nob)
990                 handle_short_read(rc, page_count, pga);
991
992         if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) {
993                 static int cksum_counter;
994                 __u32 cksum = osc_checksum_bulk(rc, page_count, pga);
995                 __u32 server_cksum = oa->o_cksum;
996
997                 if (server_cksum == ~0 && rc > 0) {
998                         CERROR("Protocol error: server %s set the 'checksum' "
999                                "bit, but didn't send a checksum.  Not fatal, "
1000                                "but please tell CFS.\n",
1001                                libcfs_nid2str(peer->nid));
1002                         RETURN(0);
1003                 }
1004
1005                 cksum_counter++;
1006
1007                 if (server_cksum != cksum) {
1008                         CERROR("Bad checksum from %s: server %x != client %x\n",
1009                                libcfs_nid2str(peer->nid), server_cksum, cksum);
1010                         cksum_counter = 0;
1011                         oa->o_cksum = cksum;
1012                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1013                         CWARN("Checksum %u from %s OK: %x\n",
1014                               cksum_counter, libcfs_nid2str(peer->nid), cksum);
1015                 }
1016                 CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum);
1017         } else if (unlikely(client_cksum)) {
1018                 static int cksum_missed;
1019
1020                 cksum_missed++;
1021                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1022                         CERROR("Checksum %u requested from %s but not sent\n",
1023                                cksum_missed, libcfs_nid2str(peer->nid));
1024         }
1025
1026         RETURN(0);
1027 }
1028
1029 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1030                             struct lov_stripe_md *lsm,
1031                             obd_count page_count, struct brw_page *pga)
1032 {
1033         int                    requested_nob;
1034         int                    niocount;
1035         struct ptlrpc_request *request;
1036         int                    rc;
1037         ENTRY;
1038
1039 restart_bulk:
1040         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1041                                   page_count, pga, &requested_nob, &niocount,
1042                                   &request);
1043         if (rc != 0)
1044                 return (rc);
1045
1046         rc = ptlrpc_queue_wait(request);
1047
1048         if (rc == -ETIMEDOUT && request->rq_resend) {
1049                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1050                 ptlrpc_req_finished(request);
1051                 goto restart_bulk;
1052         }
1053
1054         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1055                                   page_count, pga, rc);
1056
1057         ptlrpc_req_finished(request);
1058         RETURN (rc);
1059 }
1060
1061 static int brw_interpret(struct ptlrpc_request *request,
1062                          struct osc_brw_async_args *aa, int rc)
1063 {
1064         struct obdo *oa      = aa->aa_oa;
1065         int requested_nob    = aa->aa_requested_nob;
1066         int niocount         = aa->aa_nio_count;
1067         obd_count page_count = aa->aa_page_count;
1068         struct brw_page *pga = aa->aa_pga;
1069         ENTRY;
1070
1071         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1072                                   page_count, pga, rc);
1073         RETURN (rc);
1074 }
1075
1076 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1077                           struct lov_stripe_md *lsm, obd_count page_count,
1078                           struct brw_page *pga, struct ptlrpc_request_set *set)
1079 {
1080         struct ptlrpc_request     *request;
1081         int                        requested_nob;
1082         int                        nio_count;
1083         struct osc_brw_async_args *aa;
1084         int                        rc;
1085         ENTRY;
1086
1087         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1088                                   page_count, pga, &requested_nob, &nio_count,
1089                                   &request);
1090
1091         if (rc == 0) {
1092                 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1093                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1094                 aa->aa_oa = oa;
1095                 aa->aa_requested_nob = requested_nob;
1096                 aa->aa_nio_count = nio_count;
1097                 aa->aa_page_count = page_count;
1098                 aa->aa_pga = pga;
1099
1100                 request->rq_interpret_reply = brw_interpret;
1101                 ptlrpc_set_add_req(set, request);
1102         }
1103         RETURN (rc);
1104 }
1105
1106 #ifndef min_t
1107 #define min_t(type,x,y) \
1108         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1109 #endif
1110
1111 /*
1112  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1113  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1114  * fine for our small page arrays and doesn't require allocation.  its an
1115  * insertion sort that swaps elements that are strides apart, shrinking the
1116  * stride down until its '1' and the array is sorted.
1117  */
1118 static void sort_brw_pages(struct brw_page *array, int num)
1119 {
1120         int stride, i, j;
1121         struct brw_page tmp;
1122
1123         if (num == 1)
1124                 return;
1125         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1126                 ;
1127
1128         do {
1129                 stride /= 3;
1130                 for (i = stride ; i < num ; i++) {
1131                         tmp = array[i];
1132                         j = i;
1133                         while (j >= stride && array[j - stride].off > tmp.off) {
1134                                 array[j] = array[j - stride];
1135                                 j -= stride;
1136                         }
1137                         array[j] = tmp;
1138                 }
1139         } while (stride > 1);
1140 }
1141
1142 static obd_count max_unfragmented_pages(struct brw_page *pg, obd_count pages)
1143 {
1144         int count = 1;
1145         int offset;
1146
1147         LASSERT (pages > 0);
1148         offset = pg->off & (CFS_PAGE_SIZE - 1);
1149
1150         for (;;) {
1151                 pages--;
1152                 if (pages == 0)         /* that's all */
1153                         return count;
1154
1155                 if (offset + pg->count < CFS_PAGE_SIZE)
1156                         return count;   /* doesn't end on page boundary */
1157
1158                 pg++;
1159                 offset = pg->off & (CFS_PAGE_SIZE - 1);
1160                 if (offset != 0)        /* doesn't start on page boundary */
1161                         return count;
1162
1163                 count++;
1164         }
1165 }
1166
1167 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1168                    struct lov_stripe_md *md, obd_count page_count,
1169                    struct brw_page *pga, struct obd_trans_info *oti)
1170 {
1171         struct obdo *saved_oa = NULL;
1172         int          rc;
1173         ENTRY;
1174
1175         if (cmd & OBD_BRW_CHECK) {
1176                 /* The caller just wants to know if there's a chance that this
1177                  * I/O can succeed */
1178                 struct obd_import *imp = class_exp2cliimp(exp);
1179
1180                 if (imp == NULL || imp->imp_invalid)
1181                         RETURN(-EIO);
1182                 RETURN(0);
1183         }
1184
1185         rc = 0;
1186
1187         while (page_count) {
1188                 obd_count pages_per_brw;
1189
1190                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1191                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1192                 else
1193                         pages_per_brw = page_count;
1194
1195                 sort_brw_pages(pga, pages_per_brw);
1196                 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1197
1198                 if (saved_oa != NULL) {
1199                         /* restore previously saved oa */
1200                         *oa = *saved_oa;
1201                 } else if (page_count > pages_per_brw) {
1202                         /* save a copy of oa (brw will clobber it) */
1203                         saved_oa = obdo_alloc();
1204                         if (saved_oa == NULL)
1205                                 RETURN(-ENOMEM);
1206                         *saved_oa = *oa;
1207                 }
1208
1209                 rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga);
1210
1211                 if (rc != 0)
1212                         break;
1213
1214                 page_count -= pages_per_brw;
1215                 pga += pages_per_brw;
1216         }
1217
1218         if (saved_oa != NULL)
1219                 obdo_free(saved_oa);
1220
1221         RETURN(rc);
1222 }
1223
1224 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1225                          struct lov_stripe_md *md, obd_count page_count,
1226                          struct brw_page *pga, struct ptlrpc_request_set *set,
1227                          struct obd_trans_info *oti)
1228 {
1229         ENTRY;
1230
1231         if (cmd & OBD_BRW_CHECK) {
1232                 /* The caller just wants to know if there's a chance that this
1233                  * I/O can succeed */
1234                 struct obd_import *imp = class_exp2cliimp(exp);
1235
1236                 if (imp == NULL || imp->imp_invalid)
1237                         RETURN(-EIO);
1238                 RETURN(0);
1239         }
1240
1241         while (page_count) {
1242                 obd_count pages_per_brw;
1243                 int rc;
1244
1245                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1246                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1247                 else
1248                         pages_per_brw = page_count;
1249
1250                 sort_brw_pages(pga, pages_per_brw);
1251                 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1252
1253                 rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set);
1254
1255                 if (rc != 0)
1256                         RETURN(rc);
1257
1258                 page_count -= pages_per_brw;
1259                 pga += pages_per_brw;
1260         }
1261         RETURN(0);
1262 }
1263
1264 static void osc_check_rpcs(struct client_obd *cli);
1265 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1266                            int sent);
1267
1268 /* This maintains the lists of pending pages to read/write for a given object
1269  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1270  * to quickly find objects that are ready to send an RPC. */
1271 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1272                          int cmd)
1273 {
1274         int optimal;
1275         ENTRY;
1276
1277         if (lop->lop_num_pending == 0)
1278                 RETURN(0);
1279
1280         /* if we have an invalid import we want to drain the queued pages
1281          * by forcing them through rpcs that immediately fail and complete
1282          * the pages.  recovery relies on this to empty the queued pages
1283          * before canceling the locks and evicting down the llite pages */
1284         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1285                 RETURN(1);
1286
1287         /* stream rpcs in queue order as long as as there is an urgent page
1288          * queued.  this is our cheap solution for good batching in the case
1289          * where writepage marks some random page in the middle of the file
1290          * as urgent because of, say, memory pressure */
1291         if (!list_empty(&lop->lop_urgent))
1292                 RETURN(1);
1293
1294         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1295         optimal = cli->cl_max_pages_per_rpc;
1296         if (cmd & OBD_BRW_WRITE) {
1297                 /* trigger a write rpc stream as long as there are dirtiers
1298                  * waiting for space.  as they're waiting, they're not going to
1299                  * create more pages to coallesce with what's waiting.. */
1300                 if (!list_empty(&cli->cl_cache_waiters))
1301                         RETURN(1);
1302
1303                 /* +16 to avoid triggering rpcs that would want to include pages
1304                  * that are being queued but which can't be made ready until
1305                  * the queuer finishes with the page. this is a wart for
1306                  * llite::commit_write() */
1307                 optimal += 16;
1308         }
1309         if (lop->lop_num_pending >= optimal)
1310                 RETURN(1);
1311
1312         RETURN(0);
1313 }
1314
1315 static void on_list(struct list_head *item, struct list_head *list,
1316                     int should_be_on)
1317 {
1318         if (list_empty(item) && should_be_on)
1319                 list_add_tail(item, list);
1320         else if (!list_empty(item) && !should_be_on)
1321                 list_del_init(item);
1322 }
1323
1324 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1325  * can find pages to build into rpcs quickly */
1326 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1327 {
1328         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1329                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1330                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1331
1332         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1333                 loi->loi_write_lop.lop_num_pending);
1334
1335         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1336                 loi->loi_read_lop.lop_num_pending);
1337 }
1338
1339 static void lop_update_pending(struct client_obd *cli,
1340                                struct loi_oap_pages *lop, int cmd, int delta)
1341 {
1342         lop->lop_num_pending += delta;
1343         if (cmd & OBD_BRW_WRITE)
1344                 cli->cl_pending_w_pages += delta;
1345         else
1346                 cli->cl_pending_r_pages += delta;
1347 }
1348
1349 /* this is called when a sync waiter receives an interruption.  Its job is to
1350  * get the caller woken as soon as possible.  If its page hasn't been put in an
1351  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1352  * desiring interruption which will forcefully complete the rpc once the rpc
1353  * has timed out */
1354 static void osc_occ_interrupted(struct oig_callback_context *occ)
1355 {
1356         struct osc_async_page *oap;
1357         struct loi_oap_pages *lop;
1358         struct lov_oinfo *loi;
1359         ENTRY;
1360
1361         /* XXX member_of() */
1362         oap = list_entry(occ, struct osc_async_page, oap_occ);
1363
1364         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1365
1366         oap->oap_interrupted = 1;
1367
1368         /* ok, it's been put in an rpc. */
1369         if (oap->oap_request != NULL) {
1370                 ptlrpc_mark_interrupted(oap->oap_request);
1371                 ptlrpcd_wake(oap->oap_request);
1372                 GOTO(unlock, 0);
1373         }
1374
1375         /* we don't get interruption callbacks until osc_trigger_group_io()
1376          * has been called and put the sync oaps in the pending/urgent lists.*/
1377         if (!list_empty(&oap->oap_pending_item)) {
1378                 list_del_init(&oap->oap_pending_item);
1379                 list_del_init(&oap->oap_urgent_item);
1380
1381                 loi = oap->oap_loi;
1382                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1383                         &loi->loi_write_lop : &loi->loi_read_lop;
1384                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1385                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1386
1387                 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1388                 oap->oap_oig = NULL;
1389         }
1390
1391 unlock:
1392         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1393 }
1394
1395 /* this is trying to propogate async writeback errors back up to the
1396  * application.  As an async write fails we record the error code for later if
1397  * the app does an fsync.  As long as errors persist we force future rpcs to be
1398  * sync so that the app can get a sync error and break the cycle of queueing
1399  * pages for which writeback will fail. */
1400 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1401                            int rc)
1402 {
1403         if (rc) {
1404                 if (!ar->ar_rc)
1405                         ar->ar_rc = rc;
1406
1407                 ar->ar_force_sync = 1;
1408                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1409                 return;
1410
1411         }
1412
1413         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1414                 ar->ar_force_sync = 0;
1415 }
1416
1417 /* this must be called holding the loi list lock to give coverage to exit_cache,
1418  * async_flag maintenance, and oap_request */
1419 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1420                               struct osc_async_page *oap, int sent, int rc)
1421 {
1422         ENTRY;
1423         osc_exit_cache(cli, oap, sent);
1424         oap->oap_async_flags = 0;
1425         oap->oap_interrupted = 0;
1426
1427         if (oap->oap_cmd & OBD_BRW_WRITE) {
1428                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1429                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1430         }
1431
1432         if (oap->oap_request != NULL) {
1433                 ptlrpc_req_finished(oap->oap_request);
1434                 oap->oap_request = NULL;
1435         }
1436
1437         if (rc == 0 && oa != NULL) {
1438                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1439                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1440                 if (oa->o_valid & OBD_MD_FLMTIME)
1441                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1442                 if (oa->o_valid & OBD_MD_FLATIME)
1443                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1444                 if (oa->o_valid & OBD_MD_FLCTIME)
1445                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1446         }
1447
1448         if (oap->oap_oig) {
1449                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1450                 oap->oap_oig = NULL;
1451                 EXIT;
1452                 return;
1453         }
1454
1455         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1456                                            oa, rc);
1457         EXIT;
1458 }
1459
1460 static int brw_interpret_oap(struct ptlrpc_request *request,
1461                              struct osc_brw_async_args *aa, int rc)
1462 {
1463         struct osc_async_page *oap;
1464         struct client_obd *cli;
1465         struct list_head *pos, *n;
1466         ENTRY;
1467
1468         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1469                                   aa->aa_nio_count, aa->aa_page_count,
1470                                   aa->aa_pga, rc);
1471
1472         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1473
1474         cli = aa->aa_cli;
1475
1476         client_obd_list_lock(&cli->cl_loi_list_lock);
1477
1478         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1479          * is called so we know whether to go to sync BRWs or wait for more
1480          * RPCs to complete */
1481         if (request->rq_reqmsg->opc == OST_WRITE)
1482                 cli->cl_w_in_flight--;
1483         else
1484                 cli->cl_r_in_flight--;
1485
1486         /* the caller may re-use the oap after the completion call so
1487          * we need to clean it up a little */
1488         list_for_each_safe(pos, n, &aa->aa_oaps) {
1489                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1490
1491                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1492                        //oap->oap_page, oap->oap_page->index, oap);
1493
1494                 list_del_init(&oap->oap_rpc_item);
1495                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1496         }
1497
1498         osc_wake_cache_waiters(cli);
1499         osc_check_rpcs(cli);
1500
1501         client_obd_list_unlock(&cli->cl_loi_list_lock);
1502
1503         obdo_free(aa->aa_oa);
1504         OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1505
1506         RETURN(0);
1507 }
1508
1509 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1510                                             struct list_head *rpc_list,
1511                                             int page_count, int cmd)
1512 {
1513         struct ptlrpc_request *req;
1514         struct brw_page *pga = NULL;
1515         int requested_nob, nio_count;
1516         struct osc_brw_async_args *aa;
1517         struct obdo *oa = NULL;
1518         struct obd_async_page_ops *ops = NULL;
1519         void *caller_data = NULL;
1520         struct list_head *pos;
1521         int i, rc;
1522
1523         ENTRY;
1524         LASSERT(!list_empty(rpc_list));
1525
1526         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1527         if (pga == NULL)
1528                 RETURN(ERR_PTR(-ENOMEM));
1529
1530         oa = obdo_alloc();
1531         if (oa == NULL)
1532                 GOTO(out, req = ERR_PTR(-ENOMEM));
1533
1534         i = 0;
1535         list_for_each(pos, rpc_list) {
1536                 struct osc_async_page *oap;
1537
1538                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1539                 if (ops == NULL) {
1540                         ops = oap->oap_caller_ops;
1541                         caller_data = oap->oap_caller_data;
1542                 }
1543                 pga[i].off = oap->oap_obj_off + oap->oap_page_off;
1544                 pga[i].pg = oap->oap_page;
1545                 pga[i].count = oap->oap_count;
1546                 pga[i].flag = oap->oap_brw_flags;
1547                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1548                        pga[i].pg, cfs_page_index(oap->oap_page), oap, pga[i].flag);
1549                 i++;
1550         }
1551
1552         /* always get the data for the obdo for the rpc */
1553         LASSERT(ops != NULL);
1554         ops->ap_fill_obdo(caller_data, cmd, oa);
1555
1556         sort_brw_pages(pga, page_count);
1557         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1558                                   pga, &requested_nob, &nio_count, &req);
1559         if (rc != 0) {
1560                 CERROR("prep_req failed: %d\n", rc);
1561                 GOTO(out, req = ERR_PTR(rc));
1562         }
1563
1564         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1565         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1566         aa->aa_oa = oa;
1567         aa->aa_requested_nob = requested_nob;
1568         aa->aa_nio_count = nio_count;
1569         aa->aa_page_count = page_count;
1570         aa->aa_pga = pga;
1571         aa->aa_cli = cli;
1572
1573 out:
1574         if (IS_ERR(req)) {
1575                 if (oa)
1576                         obdo_free(oa);
1577                 if (pga)
1578                         OBD_FREE(pga, sizeof(*pga) * page_count);
1579         }
1580         RETURN(req);
1581 }
1582
1583 /* the loi lock is held across this function but it's allowed to release
1584  * and reacquire it during its work */
1585 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1586                             int cmd, struct loi_oap_pages *lop)
1587 {
1588         struct ptlrpc_request *request;
1589         obd_count page_count = 0;
1590         struct list_head *tmp, *pos;
1591         struct osc_async_page *oap = NULL;
1592         struct osc_brw_async_args *aa;
1593         struct obd_async_page_ops *ops;
1594         CFS_LIST_HEAD(rpc_list);
1595         unsigned int ending_offset;
1596         unsigned  starting_offset = 0;
1597         ENTRY;
1598
1599         /* first we find the pages we're allowed to work with */
1600         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1601                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1602                 ops = oap->oap_caller_ops;
1603
1604                 LASSERT(oap->oap_magic == OAP_MAGIC);
1605
1606                 /* in llite being 'ready' equates to the page being locked
1607                  * until completion unlocks it.  commit_write submits a page
1608                  * as not ready because its unlock will happen unconditionally
1609                  * as the call returns.  if we race with commit_write giving
1610                  * us that page we dont' want to create a hole in the page
1611                  * stream, so we stop and leave the rpc to be fired by
1612                  * another dirtier or kupdated interval (the not ready page
1613                  * will still be on the dirty list).  we could call in
1614                  * at the end of ll_file_write to process the queue again. */
1615                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1616                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1617                         if (rc < 0)
1618                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1619                                                 "instead of ready\n", oap,
1620                                                 oap->oap_page, rc);
1621                         switch (rc) {
1622                         case -EAGAIN:
1623                                 /* llite is telling us that the page is still
1624                                  * in commit_write and that we should try
1625                                  * and put it in an rpc again later.  we
1626                                  * break out of the loop so we don't create
1627                                  * a hole in the sequence of pages in the rpc
1628                                  * stream.*/
1629                                 pos = NULL;
1630                                 break;
1631                         case -EINTR:
1632                                 /* the io isn't needed.. tell the checks
1633                                  * below to complete the rpc with EINTR */
1634                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1635                                 oap->oap_count = -EINTR;
1636                                 break;
1637                         case 0:
1638                                 oap->oap_async_flags |= ASYNC_READY;
1639                                 break;
1640                         default:
1641                                 LASSERTF(0, "oap %p page %p returned %d "
1642                                             "from make_ready\n", oap,
1643                                             oap->oap_page, rc);
1644                                 break;
1645                         }
1646                 }
1647                 if (pos == NULL)
1648                         break;
1649                 /*
1650                  * Page submitted for IO has to be locked. Either by
1651                  * ->ap_make_ready() or by higher layers.
1652                  *
1653                  * XXX nikita: this assertion should be adjusted when lustre
1654                  * starts using PG_writeback for pages being written out.
1655                  */
1656 #if defined(__KERNEL__) && defined(__LINUX__)
1657                 LASSERT(PageLocked(oap->oap_page));
1658 #endif
1659                 /* If there is a gap at the start of this page, it can't merge
1660                  * with any previous page, so we'll hand the network a
1661                  * "fragmented" page array that it can't transfer in 1 RDMA */
1662                 if (page_count != 0 && oap->oap_page_off != 0)
1663                         break;
1664
1665                 /* take the page out of our book-keeping */
1666                 list_del_init(&oap->oap_pending_item);
1667                 lop_update_pending(cli, lop, cmd, -1);
1668                 list_del_init(&oap->oap_urgent_item);
1669
1670                 if (page_count == 0)
1671                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1672                                           (PTLRPC_MAX_BRW_SIZE - 1);
1673
1674                 /* ask the caller for the size of the io as the rpc leaves. */
1675                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1676                         oap->oap_count =
1677                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1678                 if (oap->oap_count <= 0) {
1679                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1680                                oap->oap_count);
1681                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1682                         continue;
1683                 }
1684
1685                 /* now put the page back in our accounting */
1686                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1687                 if (++page_count >= cli->cl_max_pages_per_rpc)
1688                         break;
1689
1690                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
1691                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
1692                  * have the same alignment as the initial writes that allocated
1693                  * extents on the server. */
1694                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
1695                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
1696                 if (ending_offset == 0)
1697                         break;
1698
1699                 /* If there is a gap at the end of this page, it can't merge
1700                  * with any subsequent pages, so we'll hand the network a
1701                  * "fragmented" page array that it can't transfer in 1 RDMA */
1702                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
1703                         break;
1704         }
1705
1706         osc_wake_cache_waiters(cli);
1707
1708         if (page_count == 0)
1709                 RETURN(0);
1710
1711         loi_list_maint(cli, loi);
1712
1713         client_obd_list_unlock(&cli->cl_loi_list_lock);
1714
1715         request = osc_build_req(cli, &rpc_list, page_count, cmd);
1716         if (IS_ERR(request)) {
1717                 /* this should happen rarely and is pretty bad, it makes the
1718                  * pending list not follow the dirty order */
1719                 client_obd_list_lock(&cli->cl_loi_list_lock);
1720                 list_for_each_safe(pos, tmp, &rpc_list) {
1721                         oap = list_entry(pos, struct osc_async_page,
1722                                          oap_rpc_item);
1723                         list_del_init(&oap->oap_rpc_item);
1724
1725                         /* queued sync pages can be torn down while the pages
1726                          * were between the pending list and the rpc */
1727                         if (oap->oap_interrupted) {
1728                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1729                                 osc_ap_completion(cli, NULL, oap, 0,
1730                                                   oap->oap_count);
1731                                 continue;
1732                         }
1733                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(request));
1734
1735                         /* put the page back in the loi/lop lists */
1736                         list_add_tail(&oap->oap_pending_item,
1737                                       &lop->lop_pending);
1738                         lop_update_pending(cli, lop, cmd, 1);
1739                         if (oap->oap_async_flags & ASYNC_URGENT)
1740                                 list_add(&oap->oap_urgent_item,
1741                                          &lop->lop_urgent);
1742                 }
1743                 loi_list_maint(cli, loi);
1744                 RETURN(PTR_ERR(request));
1745         }
1746
1747         LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1748         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1749         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1750         list_splice(&rpc_list, &aa->aa_oaps);
1751         CFS_INIT_LIST_HEAD(&rpc_list);
1752
1753         if (cmd == OBD_BRW_READ) {
1754                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1755                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1756                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1757                                       starting_offset/CFS_PAGE_SIZE + 1);
1758         } else {
1759                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1760                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1761                                  cli->cl_w_in_flight);
1762                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1763                                       starting_offset/CFS_PAGE_SIZE + 1);
1764         }
1765
1766         client_obd_list_lock(&cli->cl_loi_list_lock);
1767
1768         if (cmd == OBD_BRW_READ)
1769                 cli->cl_r_in_flight++;
1770         else
1771                 cli->cl_w_in_flight++;
1772
1773         /* queued sync pages can be torn down while the pages
1774          * were between the pending list and the rpc */
1775         list_for_each(pos, &aa->aa_oaps) {
1776                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1777                 if (oap->oap_interrupted) {
1778                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1779                                oap, request);
1780                         ptlrpc_mark_interrupted(request);
1781                         break;
1782                 }
1783         }
1784
1785         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
1786                         request, page_count, aa, cli->cl_r_in_flight,
1787                         cli->cl_w_in_flight);
1788
1789         oap->oap_request = ptlrpc_request_addref(request);
1790         request->rq_interpret_reply = brw_interpret_oap;
1791         ptlrpcd_add_req(request);
1792         RETURN(1);
1793 }
1794
1795 #define LOI_DEBUG(LOI, STR, args...)                                     \
1796         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1797                !list_empty(&(LOI)->loi_cli_item),                        \
1798                (LOI)->loi_write_lop.lop_num_pending,                     \
1799                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
1800                (LOI)->loi_read_lop.lop_num_pending,                      \
1801                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
1802                args)                                                     \
1803
1804 /* This is called by osc_check_rpcs() to find which objects have pages that
1805  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
1806 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1807 {
1808         ENTRY;
1809         /* first return all objects which we already know to have
1810          * pages ready to be stuffed into rpcs */
1811         if (!list_empty(&cli->cl_loi_ready_list))
1812                 RETURN(list_entry(cli->cl_loi_ready_list.next,
1813                                   struct lov_oinfo, loi_cli_item));
1814
1815         /* then if we have cache waiters, return all objects with queued
1816          * writes.  This is especially important when many small files
1817          * have filled up the cache and not been fired into rpcs because
1818          * they don't pass the nr_pending/object threshhold */
1819         if (!list_empty(&cli->cl_cache_waiters) &&
1820             !list_empty(&cli->cl_loi_write_list))
1821                 RETURN(list_entry(cli->cl_loi_write_list.next,
1822                                   struct lov_oinfo, loi_write_item));
1823
1824         /* then return all queued objects when we have an invalid import
1825          * so that they get flushed */
1826         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1827                 if (!list_empty(&cli->cl_loi_write_list))
1828                         RETURN(list_entry(cli->cl_loi_write_list.next,
1829                                           struct lov_oinfo, loi_write_item));
1830                 if (!list_empty(&cli->cl_loi_read_list))
1831                         RETURN(list_entry(cli->cl_loi_read_list.next,
1832                                           struct lov_oinfo, loi_read_item));
1833         }
1834         RETURN(NULL);
1835 }
1836
1837 /* called with the loi list lock held */
1838 static void osc_check_rpcs(struct client_obd *cli)
1839 {
1840         struct lov_oinfo *loi;
1841         int rc = 0, race_counter = 0;
1842         ENTRY;
1843
1844         while ((loi = osc_next_loi(cli)) != NULL) {
1845                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1846
1847                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
1848                         break;
1849
1850                 /* attempt some read/write balancing by alternating between
1851                  * reads and writes in an object.  The makes_rpc checks here
1852                  * would be redundant if we were getting read/write work items
1853                  * instead of objects.  we don't want send_oap_rpc to drain a
1854                  * partial read pending queue when we're given this object to
1855                  * do io on writes while there are cache waiters */
1856                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1857                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1858                                               &loi->loi_write_lop);
1859                         if (rc < 0)
1860                                 break;
1861                         if (rc > 0)
1862                                 race_counter = 0;
1863                         else
1864                                 race_counter++;
1865                 }
1866                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1867                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1868                                               &loi->loi_read_lop);
1869                         if (rc < 0)
1870                                 break;
1871                         if (rc > 0)
1872                                 race_counter = 0;
1873                         else
1874                                 race_counter++;
1875                 }
1876
1877                 /* attempt some inter-object balancing by issueing rpcs
1878                  * for each object in turn */
1879                 if (!list_empty(&loi->loi_cli_item))
1880                         list_del_init(&loi->loi_cli_item);
1881                 if (!list_empty(&loi->loi_write_item))
1882                         list_del_init(&loi->loi_write_item);
1883                 if (!list_empty(&loi->loi_read_item))
1884                         list_del_init(&loi->loi_read_item);
1885
1886                 loi_list_maint(cli, loi);
1887
1888                 /* send_oap_rpc fails with 0 when make_ready tells it to
1889                  * back off.  llite's make_ready does this when it tries
1890                  * to lock a page queued for write that is already locked.
1891                  * we want to try sending rpcs from many objects, but we
1892                  * don't want to spin failing with 0.  */
1893                 if (race_counter == 10)
1894                         break;
1895         }
1896         EXIT;
1897 }
1898
1899 /* we're trying to queue a page in the osc so we're subject to the
1900  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1901  * If the osc's queued pages are already at that limit, then we want to sleep
1902  * until there is space in the osc's queue for us.  We also may be waiting for
1903  * write credits from the OST if there are RPCs in flight that may return some
1904  * before we fall back to sync writes.
1905  *
1906  * We need this know our allocation was granted in the presence of signals */
1907 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1908 {
1909         int rc;
1910         ENTRY;
1911         client_obd_list_lock(&cli->cl_loi_list_lock);
1912         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1913         client_obd_list_unlock(&cli->cl_loi_list_lock);
1914         RETURN(rc);
1915 };
1916
1917 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1918  * grant or cache space. */
1919 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1920                            struct osc_async_page *oap)
1921 {
1922         struct osc_cache_waiter ocw;
1923         struct l_wait_info lwi = { 0 };
1924
1925         ENTRY;
1926         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1927                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1928                cli->cl_avail_grant);
1929
1930         /* force the caller to try sync io.  this can jump the list
1931          * of queued writes and create a discontiguous rpc stream */
1932         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
1933             loi->loi_ar.ar_force_sync)
1934                 RETURN(-EDQUOT);
1935
1936         /* Hopefully normal case - cache space and write credits available */
1937         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
1938             cli->cl_avail_grant >= CFS_PAGE_SIZE) {
1939                 /* account for ourselves */
1940                 osc_consume_write_grant(cli, oap);
1941                 RETURN(0);
1942         }
1943
1944         /* Make sure that there are write rpcs in flight to wait for.  This
1945          * is a little silly as this object may not have any pending but
1946          * other objects sure might. */
1947         if (cli->cl_w_in_flight) {
1948                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1949                 cfs_waitq_init(&ocw.ocw_waitq);
1950                 ocw.ocw_oap = oap;
1951                 ocw.ocw_rc = 0;
1952
1953                 loi_list_maint(cli, loi);
1954                 osc_check_rpcs(cli);
1955                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1956
1957                 CDEBUG(D_CACHE, "sleeping for cache space\n");
1958                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1959
1960                 client_obd_list_lock(&cli->cl_loi_list_lock);
1961                 if (!list_empty(&ocw.ocw_entry)) {
1962                         list_del(&ocw.ocw_entry);
1963                         RETURN(-EINTR);
1964                 }
1965                 RETURN(ocw.ocw_rc);
1966         }
1967
1968         RETURN(-EDQUOT);
1969 }
1970
1971 /* the companion to enter_cache, called when an oap is no longer part of the
1972  * dirty accounting.. so writeback completes or truncate happens before writing
1973  * starts.  must be called with the loi lock held. */
1974 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1975                            int sent)
1976 {
1977         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
1978         ENTRY;
1979
1980         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1981                 EXIT;
1982                 return;
1983         }
1984
1985         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1986         cli->cl_dirty -= CFS_PAGE_SIZE;
1987         if (!sent) {
1988                 cli->cl_lost_grant += CFS_PAGE_SIZE;
1989                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1990                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1991         } else if (CFS_PAGE_SIZE != blocksize && oap->oap_count != CFS_PAGE_SIZE) {
1992                 /* For short writes we shouldn't count parts of pages that
1993                  * span a whole block on the OST side, or our accounting goes
1994                  * wrong.  Should match the code in filter_grant_check. */
1995                 int offset = (oap->oap_obj_off +oap->oap_page_off) & ~CFS_PAGE_MASK;
1996                 int count = oap->oap_count + (offset & (blocksize - 1));
1997                 int end = (offset + oap->oap_count) & (blocksize - 1);
1998                 if (end)
1999                         count += blocksize - end;
2000
2001                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
2002                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
2003                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
2004                        cli->cl_avail_grant, cli->cl_dirty);
2005         }
2006
2007         EXIT;
2008 }
2009
2010 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2011                         struct lov_oinfo *loi, cfs_page_t *page,
2012                         obd_off offset, struct obd_async_page_ops *ops,
2013                         void *data, void **res)
2014 {
2015         struct osc_async_page *oap;
2016         ENTRY;
2017
2018         if (!page)
2019                 return size_round(sizeof(*oap));
2020
2021         oap = *res;
2022         oap->oap_magic = OAP_MAGIC;
2023         oap->oap_cli = &exp->exp_obd->u.cli;
2024         oap->oap_loi = loi;
2025
2026         oap->oap_caller_ops = ops;
2027         oap->oap_caller_data = data;
2028
2029         oap->oap_page = page;
2030         oap->oap_obj_off = offset;
2031
2032         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2033         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2034         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2035
2036         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2037
2038         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2039         RETURN(0);
2040 }
2041
2042 struct osc_async_page *oap_from_cookie(void *cookie)
2043 {
2044         struct osc_async_page *oap = cookie;
2045         if (oap->oap_magic != OAP_MAGIC)
2046                 return ERR_PTR(-EINVAL);
2047         return oap;
2048 };
2049
2050 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2051                               struct lov_oinfo *loi, void *cookie,
2052                               int cmd, obd_off off, int count,
2053                               obd_flag brw_flags, enum async_flags async_flags)
2054 {
2055         struct client_obd *cli = &exp->exp_obd->u.cli;
2056         struct osc_async_page *oap;
2057         struct loi_oap_pages *lop;
2058         int rc = 0;
2059         ENTRY;
2060
2061         oap = oap_from_cookie(cookie);
2062         if (IS_ERR(oap))
2063                 RETURN(PTR_ERR(oap));
2064
2065         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2066                 RETURN(-EIO);
2067
2068         if (!list_empty(&oap->oap_pending_item) ||
2069             !list_empty(&oap->oap_urgent_item) ||
2070             !list_empty(&oap->oap_rpc_item))
2071                 RETURN(-EBUSY);
2072
2073         /* check if the file's owner/group is over quota */
2074 #ifdef HAVE_QUOTA_SUPPORT
2075         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2076                 struct obd_async_page_ops *ops;
2077                 struct obdo *oa;
2078
2079                 oa = obdo_alloc();
2080                 if (oa == NULL)
2081                         RETURN(-ENOMEM);
2082
2083                 ops = oap->oap_caller_ops;
2084                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2085                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2086                     NO_QUOTA)
2087                         rc = -EDQUOT;
2088
2089                 obdo_free(oa);
2090                 if (rc)
2091                         RETURN(rc);
2092         }
2093 #endif
2094
2095         if (loi == NULL)
2096                 loi = &lsm->lsm_oinfo[0];
2097
2098         client_obd_list_lock(&cli->cl_loi_list_lock);
2099
2100         oap->oap_cmd = cmd;
2101         oap->oap_page_off = off;
2102         oap->oap_count = count;
2103         oap->oap_brw_flags = brw_flags;
2104         oap->oap_async_flags = async_flags;
2105
2106         if (cmd & OBD_BRW_WRITE) {
2107                 rc = osc_enter_cache(cli, loi, oap);
2108                 if (rc) {
2109                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2110                         RETURN(rc);
2111                 }
2112                 lop = &loi->loi_write_lop;
2113         } else {
2114                 lop = &loi->loi_read_lop;
2115         }
2116
2117         if (oap->oap_async_flags & ASYNC_URGENT)
2118                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2119         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2120         lop_update_pending(cli, lop, cmd, 1);
2121
2122         loi_list_maint(cli, loi);
2123
2124         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2125                   cmd);
2126
2127         osc_check_rpcs(cli);
2128         client_obd_list_unlock(&cli->cl_loi_list_lock);
2129
2130         RETURN(0);
2131 }
2132
2133 /* aka (~was & now & flag), but this is more clear :) */
2134 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2135
2136 static int osc_set_async_flags(struct obd_export *exp,
2137                                struct lov_stripe_md *lsm,
2138                                struct lov_oinfo *loi, void *cookie,
2139                                obd_flag async_flags)
2140 {
2141         struct client_obd *cli = &exp->exp_obd->u.cli;
2142         struct loi_oap_pages *lop;
2143         struct osc_async_page *oap;
2144         int rc = 0;
2145         ENTRY;
2146
2147         oap = oap_from_cookie(cookie);
2148         if (IS_ERR(oap))
2149                 RETURN(PTR_ERR(oap));
2150
2151         /*
2152          * bug 7311: OST-side locking is only supported for liblustre for now
2153          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2154          * implementation has to handle case where OST-locked page was picked
2155          * up by, e.g., ->writepage().
2156          */
2157         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2158         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2159                                      * tread here. */
2160
2161         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2162                 RETURN(-EIO);
2163
2164         if (loi == NULL)
2165                 loi = &lsm->lsm_oinfo[0];
2166
2167         if (oap->oap_cmd & OBD_BRW_WRITE) {
2168                 lop = &loi->loi_write_lop;
2169         } else {
2170                 lop = &loi->loi_read_lop;
2171         }
2172
2173         client_obd_list_lock(&cli->cl_loi_list_lock);
2174
2175         if (list_empty(&oap->oap_pending_item))
2176                 GOTO(out, rc = -EINVAL);
2177
2178         if ((oap->oap_async_flags & async_flags) == async_flags)
2179                 GOTO(out, rc = 0);
2180
2181         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2182                 oap->oap_async_flags |= ASYNC_READY;
2183
2184         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2185                 if (list_empty(&oap->oap_rpc_item)) {
2186                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2187                         loi_list_maint(cli, loi);
2188                 }
2189         }
2190
2191         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2192                         oap->oap_async_flags);
2193 out:
2194         osc_check_rpcs(cli);
2195         client_obd_list_unlock(&cli->cl_loi_list_lock);
2196         RETURN(rc);
2197 }
2198
2199 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2200                              struct lov_oinfo *loi,
2201                              struct obd_io_group *oig, void *cookie,
2202                              int cmd, obd_off off, int count,
2203                              obd_flag brw_flags,
2204                              obd_flag async_flags)
2205 {
2206         struct client_obd *cli = &exp->exp_obd->u.cli;
2207         struct osc_async_page *oap;
2208         struct loi_oap_pages *lop;
2209         ENTRY;
2210
2211         oap = oap_from_cookie(cookie);
2212         if (IS_ERR(oap))
2213                 RETURN(PTR_ERR(oap));
2214
2215         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2216                 RETURN(-EIO);
2217
2218         if (!list_empty(&oap->oap_pending_item) ||
2219             !list_empty(&oap->oap_urgent_item) ||
2220             !list_empty(&oap->oap_rpc_item))
2221                 RETURN(-EBUSY);
2222
2223         if (loi == NULL)
2224                 loi = &lsm->lsm_oinfo[0];
2225
2226         client_obd_list_lock(&cli->cl_loi_list_lock);
2227
2228         oap->oap_cmd = cmd;
2229         oap->oap_page_off = off;
2230         oap->oap_count = count;
2231         oap->oap_brw_flags = brw_flags;
2232         oap->oap_async_flags = async_flags;
2233
2234         if (cmd & OBD_BRW_WRITE)
2235                 lop = &loi->loi_write_lop;
2236         else
2237                 lop = &loi->loi_read_lop;
2238
2239         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2240         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2241                 oap->oap_oig = oig;
2242                 oig_add_one(oig, &oap->oap_occ);
2243         }
2244
2245         LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
2246
2247         client_obd_list_unlock(&cli->cl_loi_list_lock);
2248
2249         RETURN(0);
2250 }
2251
2252 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2253                                  struct loi_oap_pages *lop, int cmd)
2254 {
2255         struct list_head *pos, *tmp;
2256         struct osc_async_page *oap;
2257
2258         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2259                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2260                 list_del(&oap->oap_pending_item);
2261                 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2262                 if (oap->oap_async_flags & ASYNC_URGENT)
2263                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2264                 lop_update_pending(cli, lop, cmd, 1);
2265         }
2266         loi_list_maint(cli, loi);
2267 }
2268
2269 static int osc_trigger_group_io(struct obd_export *exp,
2270                                 struct lov_stripe_md *lsm,
2271                                 struct lov_oinfo *loi,
2272                                 struct obd_io_group *oig)
2273 {
2274         struct client_obd *cli = &exp->exp_obd->u.cli;
2275         ENTRY;
2276
2277         if (loi == NULL)
2278                 loi = &lsm->lsm_oinfo[0];
2279
2280         client_obd_list_lock(&cli->cl_loi_list_lock);
2281
2282         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2283         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2284
2285         osc_check_rpcs(cli);
2286         client_obd_list_unlock(&cli->cl_loi_list_lock);
2287
2288         RETURN(0);
2289 }
2290
2291 static int osc_teardown_async_page(struct obd_export *exp,
2292                                    struct lov_stripe_md *lsm,
2293                                    struct lov_oinfo *loi, void *cookie)
2294 {
2295         struct client_obd *cli = &exp->exp_obd->u.cli;
2296         struct loi_oap_pages *lop;
2297         struct osc_async_page *oap;
2298         int rc = 0;
2299         ENTRY;
2300
2301         oap = oap_from_cookie(cookie);
2302         if (IS_ERR(oap))
2303                 RETURN(PTR_ERR(oap));
2304
2305         if (loi == NULL)
2306                 loi = &lsm->lsm_oinfo[0];
2307
2308         if (oap->oap_cmd & OBD_BRW_WRITE) {
2309                 lop = &loi->loi_write_lop;
2310         } else {
2311                 lop = &loi->loi_read_lop;
2312         }
2313
2314         client_obd_list_lock(&cli->cl_loi_list_lock);
2315
2316         if (!list_empty(&oap->oap_rpc_item))
2317                 GOTO(out, rc = -EBUSY);
2318
2319         osc_exit_cache(cli, oap, 0);
2320         osc_wake_cache_waiters(cli);
2321
2322         if (!list_empty(&oap->oap_urgent_item)) {
2323                 list_del_init(&oap->oap_urgent_item);
2324                 oap->oap_async_flags &= ~ASYNC_URGENT;
2325         }
2326         if (!list_empty(&oap->oap_pending_item)) {
2327                 list_del_init(&oap->oap_pending_item);
2328                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2329         }
2330         loi_list_maint(cli, loi);
2331
2332         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2333 out:
2334         client_obd_list_unlock(&cli->cl_loi_list_lock);
2335         RETURN(rc);
2336 }
2337
2338 /* Note: caller will lock/unlock, and set uptodate on the pages */
2339 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2340 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2341                            struct lov_stripe_md *lsm, obd_count page_count,
2342                            struct brw_page *pga)
2343 {
2344         struct ptlrpc_request *request = NULL;
2345         struct ost_body *body;
2346         struct niobuf_remote *nioptr;
2347         struct obd_ioobj *iooptr;
2348         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2349         struct obd_import *imp = class_exp2cliimp(exp);
2350         int swab;
2351         ENTRY;
2352
2353         /* XXX does not handle 'new' brw protocol */
2354
2355         size[1] = sizeof(struct obd_ioobj);
2356         size[2] = page_count * sizeof(*nioptr);
2357
2358         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
2359                                   OST_SAN_READ, 3, size, NULL);
2360         if (!request)
2361                 RETURN(-ENOMEM);
2362
2363         /* FIXME bug 249 */
2364         /* See bug 7198 */
2365         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2366                 request->rq_request_portal = OST_IO_PORTAL;
2367
2368         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2369         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2370         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2371                                 sizeof(*nioptr) * page_count);
2372
2373         memcpy(&body->oa, oa, sizeof(body->oa));
2374
2375         obdo_to_ioobj(oa, iooptr);
2376         iooptr->ioo_bufcnt = page_count;
2377
2378         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2379                 LASSERT(PageLocked(pga[mapped].pg));
2380                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2381
2382                 nioptr->offset = pga[mapped].off;
2383                 nioptr->len    = pga[mapped].count;
2384                 nioptr->flags  = pga[mapped].flag;
2385         }
2386
2387         size[1] = page_count * sizeof(*nioptr);
2388         request->rq_replen = lustre_msg_size(2, size);
2389
2390         rc = ptlrpc_queue_wait(request);
2391         if (rc)
2392                 GOTO(out_req, rc);
2393
2394         body = lustre_swab_repbuf(request, 0, sizeof(*body),
2395                                   lustre_swab_ost_body);
2396         if (body == NULL) {
2397                 CERROR("Can't unpack body\n");
2398                 GOTO(out_req, rc = -EPROTO);
2399         }
2400
2401         memcpy(oa, &body->oa, sizeof(*oa));
2402
2403         swab = lustre_msg_swabbed(request->rq_repmsg);
2404         LASSERT_REPSWAB(request, 1);
2405         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2406         if (!nioptr) {
2407                 /* nioptr missing or short */
2408                 GOTO(out_req, rc = -EPROTO);
2409         }
2410
2411         /* actual read */
2412         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2413                 struct page *page = pga[mapped].pg;
2414                 struct buffer_head *bh;
2415                 kdev_t dev;
2416
2417                 if (swab)
2418                         lustre_swab_niobuf_remote (nioptr);
2419
2420                 /* got san device associated */
2421                 LASSERT(exp->exp_obd != NULL);
2422                 dev = exp->exp_obd->u.cli.cl_sandev;
2423
2424                 /* hole */
2425                 if (!nioptr->offset) {
2426                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2427                                         page->mapping->host->i_ino,
2428                                         page->index);
2429                         memset(page_address(page), 0, CFS_PAGE_SIZE);
2430                         continue;
2431                 }
2432
2433                 if (!page->buffers) {
2434                         create_empty_buffers(page, dev, CFS_PAGE_SIZE);
2435                         bh = page->buffers;
2436
2437                         clear_bit(BH_New, &bh->b_state);
2438                         set_bit(BH_Mapped, &bh->b_state);
2439                         bh->b_blocknr = (unsigned long)nioptr->offset;
2440
2441                         clear_bit(BH_Uptodate, &bh->b_state);
2442
2443                         ll_rw_block(READ, 1, &bh);
2444                 } else {
2445                         bh = page->buffers;
2446
2447                         /* if buffer already existed, it must be the
2448                          * one we mapped before, check it */
2449                         LASSERT(!test_bit(BH_New, &bh->b_state));
2450                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
2451                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2452
2453                         /* wait it's io completion */
2454                         if (test_bit(BH_Lock, &bh->b_state))
2455                                 wait_on_buffer(bh);
2456
2457                         if (!test_bit(BH_Uptodate, &bh->b_state))
2458                                 ll_rw_block(READ, 1, &bh);
2459                 }
2460
2461
2462                 /* must do syncronous write here */
2463                 wait_on_buffer(bh);
2464                 if (!buffer_uptodate(bh)) {
2465                         /* I/O error */
2466                         rc = -EIO;
2467                         goto out_req;
2468                 }
2469         }
2470
2471 out_req:
2472         ptlrpc_req_finished(request);
2473         RETURN(rc);
2474 }
2475
2476 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2477                             struct lov_stripe_md *lsm, obd_count page_count,
2478                             struct brw_page *pga)
2479 {
2480         struct ptlrpc_request *request = NULL;
2481         struct ost_body *body;
2482         struct niobuf_remote *nioptr;
2483         struct obd_ioobj *iooptr;
2484         struct obd_import *imp = class_exp2cliimp(exp);
2485         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2486         int swab;
2487         ENTRY;
2488
2489         size[1] = sizeof(struct obd_ioobj);
2490         size[2] = page_count * sizeof(*nioptr);
2491
2492         request = ptlrpc_prep_req_pool(class_exp2cliimp(exp),
2493                                        LUSTRE_OST_VERSION, OST_SAN_WRITE,
2494                                        3, size, NULL, imp->imp_rq_pool);
2495         if (!request)
2496                 RETURN(-ENOMEM);
2497
2498         /* FIXME bug 249 */
2499         /* See bug 7198 */
2500         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2501                 request->rq_request_portal = OST_IO_PORTAL;
2502
2503         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2504         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2505         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2506                                 sizeof (*nioptr) * page_count);
2507
2508         memcpy(&body->oa, oa, sizeof(body->oa));
2509
2510         obdo_to_ioobj(oa, iooptr);
2511         iooptr->ioo_bufcnt = page_count;
2512
2513         /* pack request */
2514         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2515                 LASSERT(PageLocked(pga[mapped].pg));
2516                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2517
2518                 nioptr->offset = pga[mapped].off;
2519                 nioptr->len    = pga[mapped].count;
2520                 nioptr->flags  = pga[mapped].flag;
2521         }
2522
2523         size[1] = page_count * sizeof(*nioptr);
2524         request->rq_replen = lustre_msg_size(2, size);
2525
2526         rc = ptlrpc_queue_wait(request);
2527         if (rc)
2528                 GOTO(out_req, rc);
2529
2530         swab = lustre_msg_swabbed (request->rq_repmsg);
2531         LASSERT_REPSWAB (request, 1);
2532         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2533         if (!nioptr) {
2534                 CERROR("absent/short niobuf array\n");
2535                 GOTO(out_req, rc = -EPROTO);
2536         }
2537
2538         /* actual write */
2539         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2540                 struct page *page = pga[mapped].pg;
2541                 struct buffer_head *bh;
2542                 kdev_t dev;
2543
2544                 if (swab)
2545                         lustre_swab_niobuf_remote (nioptr);
2546
2547                 /* got san device associated */
2548                 LASSERT(exp->exp_obd != NULL);
2549                 dev = exp->exp_obd->u.cli.cl_sandev;
2550
2551                 if (!page->buffers) {
2552                         create_empty_buffers(page, dev, CFS_PAGE_SIZE);
2553                 } else {
2554                         /* checking */
2555                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2556                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2557                         LASSERT(page->buffers->b_blocknr ==
2558                                 (unsigned long)nioptr->offset);
2559                 }
2560                 bh = page->buffers;
2561
2562                 LASSERT(bh);
2563
2564                 /* if buffer locked, wait it's io completion */
2565                 if (test_bit(BH_Lock, &bh->b_state))
2566                         wait_on_buffer(bh);
2567
2568                 clear_bit(BH_New, &bh->b_state);
2569                 set_bit(BH_Mapped, &bh->b_state);
2570
2571                 /* override the block nr */
2572                 bh->b_blocknr = (unsigned long)nioptr->offset;
2573
2574                 /* we are about to write it, so set it
2575                  * uptodate/dirty
2576                  * page lock should garentee no race condition here */
2577                 set_bit(BH_Uptodate, &bh->b_state);
2578                 set_bit(BH_Dirty, &bh->b_state);
2579
2580                 ll_rw_block(WRITE, 1, &bh);
2581
2582                 /* must do syncronous write here */
2583                 wait_on_buffer(bh);
2584                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2585                         /* I/O error */
2586                         rc = -EIO;
2587                         goto out_req;
2588                 }
2589         }
2590
2591 out_req:
2592         ptlrpc_req_finished(request);
2593         RETURN(rc);
2594 }
2595
2596 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2597                       struct lov_stripe_md *lsm, obd_count page_count,
2598                       struct brw_page *pga, struct obd_trans_info *oti)
2599 {
2600         ENTRY;
2601
2602         while (page_count) {
2603                 obd_count pages_per_brw;
2604                 int rc;
2605
2606                 if (page_count > PTLRPC_MAX_BRW_PAGES)
2607                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2608                 else
2609                         pages_per_brw = page_count;
2610
2611                 if (cmd & OBD_BRW_WRITE)
2612                         rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2613                 else
2614                         rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2615
2616                 if (rc != 0)
2617                         RETURN(rc);
2618
2619                 page_count -= pages_per_brw;
2620                 pga += pages_per_brw;
2621         }
2622         RETURN(0);
2623 }
2624 #endif
2625
2626 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2627                                     int flags)
2628 {
2629         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2630
2631         if (lock == NULL) {
2632                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2633                 return;
2634         }
2635         l_lock(&lock->l_resource->lr_namespace->ns_lock);
2636 #ifdef __KERNEL__
2637 #ifdef __LINUX__
2638         /* Liang XXX: Darwin and Winnt checking should be added */
2639         if (lock->l_ast_data && lock->l_ast_data != data) {
2640                 struct inode *new_inode = data;
2641                 struct inode *old_inode = lock->l_ast_data;
2642                 if (!(old_inode->i_state & I_FREEING))
2643                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2644                 LASSERTF(old_inode->i_state & I_FREEING,
2645                          "Found existing inode %p/%lu/%u state %lu in lock: "
2646                          "setting data to %p/%lu/%u\n", old_inode,
2647                          old_inode->i_ino, old_inode->i_generation,
2648                          old_inode->i_state,
2649                          new_inode, new_inode->i_ino, new_inode->i_generation);
2650         }
2651 #endif
2652 #endif
2653         lock->l_ast_data = data;
2654         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2655         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2656         LDLM_LOCK_PUT(lock);
2657 }
2658
2659 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2660                              ldlm_iterator_t replace, void *data)
2661 {
2662         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2663         struct obd_device *obd = class_exp2obd(exp);
2664
2665         ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2666         return 0;
2667 }
2668
2669 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2670                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2671                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2672                        void *data, __u32 lvb_len, void *lvb_swabber,
2673                        struct lustre_handle *lockh)
2674 {
2675         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2676         struct obd_device *obd = exp->exp_obd;
2677         struct ost_lvb lvb;
2678         struct ldlm_reply *rep;
2679         struct ptlrpc_request *req = NULL;
2680         int rc;
2681         ENTRY;
2682
2683         /* Filesystem lock extents are extended to page boundaries so that
2684          * dealing with the page cache is a little smoother.  */
2685         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2686         policy->l_extent.end |= ~CFS_PAGE_MASK;
2687
2688         if (lsm->lsm_oinfo->loi_kms_valid == 0)
2689                 goto no_match;
2690
2691         /* Next, search for already existing extent locks that will cover us */
2692         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, policy,
2693                              mode, lockh);
2694         if (rc == 1) {
2695                 osc_set_data_with_check(lockh, data, *flags);
2696                 if (*flags & LDLM_FL_HAS_INTENT) {
2697                         /* I would like to be able to ASSERT here that rss <=
2698                          * kms, but I can't, for reasons which are explained in
2699                          * lov_enqueue() */
2700                 }
2701                 /* We already have a lock, and it's referenced */
2702                 RETURN(ELDLM_OK);
2703         }
2704
2705         /* If we're trying to read, we also search for an existing PW lock.  The
2706          * VFS and page cache already protect us locally, so lots of readers/
2707          * writers can share a single PW lock.
2708          *
2709          * There are problems with conversion deadlocks, so instead of
2710          * converting a read lock to a write lock, we'll just enqueue a new
2711          * one.
2712          *
2713          * At some point we should cancel the read lock instead of making them
2714          * send us a blocking callback, but there are problems with canceling
2715          * locks out from other users right now, too. */
2716
2717         if (mode == LCK_PR) {
2718                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2719                                      policy, LCK_PW, lockh);
2720                 if (rc == 1) {
2721                         /* FIXME: This is not incredibly elegant, but it might
2722                          * be more elegant than adding another parameter to
2723                          * lock_match.  I want a second opinion. */
2724                         ldlm_lock_addref(lockh, LCK_PR);
2725                         ldlm_lock_decref(lockh, LCK_PW);
2726                         osc_set_data_with_check(lockh, data, *flags);
2727                         RETURN(ELDLM_OK);
2728                 }
2729         }
2730
2731  no_match:
2732         if (*flags & LDLM_FL_HAS_INTENT) {
2733                 int size[2] = {sizeof(struct ldlm_request), sizeof(lvb)};
2734
2735                 req = ptlrpc_prep_req(class_exp2cliimp(exp),
2736                                       LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 1,
2737                                       size, NULL);
2738                 if (req == NULL)
2739                         RETURN(-ENOMEM);
2740
2741                 size[0] = sizeof(*rep);
2742                 req->rq_replen = lustre_msg_size(2, size);
2743         }
2744
2745         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2746         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2747
2748         rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2749                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2750                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2751
2752         if (req != NULL) {
2753                 if (rc == ELDLM_LOCK_ABORTED) {
2754                         /* swabbed by ldlm_cli_enqueue() */
2755                         LASSERT_REPSWABBED(req, 0);
2756                         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2757                         LASSERT(rep != NULL);
2758                         if (rep->lock_policy_res1)
2759                                 rc = rep->lock_policy_res1;
2760                 }
2761                 ptlrpc_req_finished(req);
2762         }
2763
2764         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2765                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2766                        lvb.lvb_size, lvb.lvb_blocks, lvb.lvb_mtime);
2767                 lsm->lsm_oinfo->loi_lvb = lvb;
2768         }
2769
2770         RETURN(rc);
2771 }
2772
2773 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2774                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2775                      int *flags, void *data, struct lustre_handle *lockh)
2776 {
2777         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2778         struct obd_device *obd = exp->exp_obd;
2779         int rc;
2780         ENTRY;
2781
2782         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2783
2784         /* Filesystem lock extents are extended to page boundaries so that
2785          * dealing with the page cache is a little smoother */
2786         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2787         policy->l_extent.end |= ~CFS_PAGE_MASK;
2788
2789         /* Next, search for already existing extent locks that will cover us */
2790         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2791                              policy, mode, lockh);
2792         if (rc) {
2793                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2794                         osc_set_data_with_check(lockh, data, *flags);
2795                 RETURN(rc);
2796         }
2797         /* If we're trying to read, we also search for an existing PW lock.  The
2798          * VFS and page cache already protect us locally, so lots of readers/
2799          * writers can share a single PW lock. */
2800         if (mode == LCK_PR) {
2801                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2802                                      policy, LCK_PW, lockh);
2803                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2804                         /* FIXME: This is not incredibly elegant, but it might
2805                          * be more elegant than adding another parameter to
2806                          * lock_match.  I want a second opinion. */
2807                         osc_set_data_with_check(lockh, data, *flags);
2808                         ldlm_lock_addref(lockh, LCK_PR);
2809                         ldlm_lock_decref(lockh, LCK_PW);
2810                 }
2811         }
2812         RETURN(rc);
2813 }
2814
2815 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2816                       __u32 mode, struct lustre_handle *lockh)
2817 {
2818         ENTRY;
2819
2820         if (unlikely(mode == LCK_GROUP))
2821                 ldlm_lock_decref_and_cancel(lockh, mode);
2822         else
2823                 ldlm_lock_decref(lockh, mode);
2824
2825         RETURN(0);
2826 }
2827
2828 static int osc_cancel_unused(struct obd_export *exp,
2829                              struct lov_stripe_md *lsm,
2830                              int flags, void *opaque)
2831 {
2832         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2833         struct obd_device *obd = class_exp2obd(exp);
2834
2835         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id,
2836                                       flags, opaque);
2837 }
2838
2839 static int osc_join_lru(struct obd_export *exp,
2840                         struct lov_stripe_md *lsm, int join)
2841 {
2842         struct obd_device *obd = class_exp2obd(exp);
2843         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2844
2845         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2846 }
2847
2848 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2849                       cfs_time_t max_age)
2850 {
2851         struct obd_statfs *msfs;
2852         struct ptlrpc_request *request;
2853         int rc, size = sizeof(*osfs);
2854         ENTRY;
2855
2856         /* We could possibly pass max_age in the request (as an absolute
2857          * timestamp or a "seconds.usec ago") so the target can avoid doing
2858          * extra calls into the filesystem if that isn't necessary (e.g.
2859          * during mount that would help a bit).  Having relative timestamps
2860          * is not so great if request processing is slow, while absolute
2861          * timestamps are not ideal because they need time synchronization. */
2862         request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2863                                   OST_STATFS,0,NULL,NULL);
2864         if (!request)
2865                 RETURN(-ENOMEM);
2866
2867         request->rq_replen = lustre_msg_size(1, &size);
2868         request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2869
2870         rc = ptlrpc_queue_wait(request);
2871         if (rc)
2872                 GOTO(out, rc);
2873
2874         msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2875                                   lustre_swab_obd_statfs);
2876         if (msfs == NULL) {
2877                 CERROR("Can't unpack obd_statfs\n");
2878                 GOTO(out, rc = -EPROTO);
2879         }
2880
2881         memcpy(osfs, msfs, sizeof(*osfs));
2882
2883         EXIT;
2884  out:
2885         ptlrpc_req_finished(request);
2886         return rc;
2887 }
2888
2889 /* Retrieve object striping information.
2890  *
2891  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2892  * the maximum number of OST indices which will fit in the user buffer.
2893  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2894  */
2895 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2896 {
2897         struct lov_user_md lum, *lumk;
2898         int rc = 0, lum_size;
2899         ENTRY;
2900
2901         if (!lsm)
2902                 RETURN(-ENODATA);
2903
2904         if (copy_from_user(&lum, lump, sizeof(lum)))
2905                 RETURN(-EFAULT);
2906
2907         if (lum.lmm_magic != LOV_USER_MAGIC)
2908                 RETURN(-EINVAL);
2909
2910         if (lum.lmm_stripe_count > 0) {
2911                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2912                 OBD_ALLOC(lumk, lum_size);
2913                 if (!lumk)
2914                         RETURN(-ENOMEM);
2915
2916                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2917         } else {
2918                 lum_size = sizeof(lum);
2919                 lumk = &lum;
2920         }
2921
2922         lumk->lmm_object_id = lsm->lsm_object_id;
2923         lumk->lmm_stripe_count = 1;
2924
2925         if (copy_to_user(lump, lumk, lum_size))
2926                 rc = -EFAULT;
2927
2928         if (lumk != &lum)
2929                 OBD_FREE(lumk, lum_size);
2930
2931         RETURN(rc);
2932 }
2933
2934
2935 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2936                          void *karg, void *uarg)
2937 {
2938         struct obd_device *obd = exp->exp_obd;
2939         struct obd_ioctl_data *data = karg;
2940         int err = 0;
2941         ENTRY;
2942
2943 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2944         MOD_INC_USE_COUNT;
2945 #else
2946         if (!try_module_get(THIS_MODULE)) {
2947                 CERROR("Can't get module. Is it alive?");
2948                 return -EINVAL;
2949         }
2950 #endif
2951         switch (cmd) {
2952         case OBD_IOC_LOV_GET_CONFIG: {
2953                 char *buf;
2954                 struct lov_desc *desc;
2955                 struct obd_uuid uuid;
2956
2957                 buf = NULL;
2958                 len = 0;
2959                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2960                         GOTO(out, err = -EINVAL);
2961
2962                 data = (struct obd_ioctl_data *)buf;
2963
2964                 if (sizeof(*desc) > data->ioc_inllen1) {
2965                         obd_ioctl_freedata(buf, len);
2966                         GOTO(out, err = -EINVAL);
2967                 }
2968
2969                 if (data->ioc_inllen2 < sizeof(uuid)) {
2970                         obd_ioctl_freedata(buf, len);
2971                         GOTO(out, err = -EINVAL);
2972                 }
2973
2974                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2975                 desc->ld_tgt_count = 1;
2976                 desc->ld_active_tgt_count = 1;
2977                 desc->ld_default_stripe_count = 1;
2978                 desc->ld_default_stripe_size = 0;
2979                 desc->ld_default_stripe_offset = 0;
2980                 desc->ld_pattern = 0;
2981                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2982
2983                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2984
2985                 err = copy_to_user((void *)uarg, buf, len);
2986                 if (err)
2987                         err = -EFAULT;
2988                 obd_ioctl_freedata(buf, len);
2989                 GOTO(out, err);
2990         }
2991         case LL_IOC_LOV_SETSTRIPE:
2992                 err = obd_alloc_memmd(exp, karg);
2993                 if (err > 0)
2994                         err = 0;
2995                 GOTO(out, err);
2996         case LL_IOC_LOV_GETSTRIPE:
2997                 err = osc_getstripe(karg, uarg);
2998                 GOTO(out, err);
2999         case OBD_IOC_CLIENT_RECOVER:
3000                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3001                                             data->ioc_inlbuf1);
3002                 if (err > 0)
3003                         err = 0;
3004                 GOTO(out, err);
3005         case IOC_OSC_SET_ACTIVE:
3006                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3007                                                data->ioc_offset);
3008                 GOTO(out, err);
3009         case OBD_IOC_POLL_QUOTACHECK:
3010                 err = lquota_poll_check(quota_interface, exp,
3011                                         (struct if_quotacheck *)karg);
3012                 GOTO(out, err);
3013         default:
3014                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3015                        cmd, cfs_curproc_comm());
3016                 GOTO(out, err = -ENOTTY);
3017         }
3018 out:
3019 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3020         MOD_DEC_USE_COUNT;
3021 #else
3022         module_put(THIS_MODULE);
3023 #endif
3024         return err;
3025 }
3026
3027 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3028                         void *key, __u32 *vallen, void *val)
3029 {
3030         ENTRY;
3031         if (!vallen || !val)
3032                 RETURN(-EFAULT);
3033
3034         if (keylen > strlen("lock_to_stripe") &&
3035             strcmp(key, "lock_to_stripe") == 0) {
3036                 __u32 *stripe = val;
3037                 *vallen = sizeof(*stripe);
3038                 *stripe = 0;
3039                 RETURN(0);
3040         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3041                 struct ptlrpc_request *req;
3042                 obd_id *reply;
3043                 char *bufs[1] = {key};
3044                 int rc;
3045                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3046                                       OST_GET_INFO, 1, &keylen, bufs);
3047                 if (req == NULL)
3048                         RETURN(-ENOMEM);
3049
3050                 req->rq_replen = lustre_msg_size(1, vallen);
3051                 rc = ptlrpc_queue_wait(req);
3052                 if (rc)
3053                         GOTO(out, rc);
3054
3055                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
3056                                            lustre_swab_ost_last_id);
3057                 if (reply == NULL) {
3058                         CERROR("Can't unpack OST last ID\n");
3059                         GOTO(out, rc = -EPROTO);
3060                 }
3061                 *((obd_id *)val) = *reply;
3062         out:
3063                 ptlrpc_req_finished(req);
3064                 RETURN(rc);
3065         }
3066         RETURN(-EINVAL);
3067 }
3068
3069 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3070                                           void *aa, int rc)
3071 {
3072         struct llog_ctxt *ctxt;
3073         struct obd_import *imp = req->rq_import;
3074         ENTRY;
3075
3076         if (rc != 0)
3077                 RETURN(rc);
3078
3079         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3080         if (ctxt) {
3081                 if (rc == 0)
3082                         rc = llog_initiator_connect(ctxt);
3083                 else
3084                         CERROR("cannot establish connection for "
3085                                "ctxt %p: %d\n", ctxt, rc);
3086         }
3087
3088         imp->imp_server_timeout = 1;
3089         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3090         imp->imp_pingable = 1;
3091
3092         RETURN(rc);
3093 }
3094
3095 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3096                               void *key, obd_count vallen, void *val,
3097                               struct ptlrpc_request_set *set)
3098 {
3099         struct ptlrpc_request *req;
3100         struct obd_device  *obd = exp->exp_obd;
3101         struct obd_import *imp = class_exp2cliimp(exp);
3102         int size[2] = {keylen, vallen};
3103         char *bufs[2] = {key, val};
3104         ENTRY;
3105
3106         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3107
3108         if (KEY_IS(KEY_NEXT_ID)) {
3109                 if (vallen != sizeof(obd_id))
3110                         RETURN(-EINVAL);
3111                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3112                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3113                        exp->exp_obd->obd_name,
3114                        obd->u.cli.cl_oscc.oscc_next_id);
3115
3116                 RETURN(0);
3117         }
3118
3119         if (KEY_IS("unlinked")) {
3120                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3121                 spin_lock(&oscc->oscc_lock);
3122                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3123                 spin_unlock(&oscc->oscc_lock);
3124                 RETURN(0);
3125         }
3126
3127         if (KEY_IS(KEY_INIT_RECOV)) {
3128                 if (vallen != sizeof(int))
3129                         RETURN(-EINVAL);
3130                 imp->imp_initial_recov = *(int *)val;
3131                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3132                        exp->exp_obd->obd_name,
3133                        imp->imp_initial_recov);
3134                 RETURN(0);
3135         }
3136
3137         if (KEY_IS("checksum")) {
3138                 if (vallen != sizeof(int))
3139                         RETURN(-EINVAL);
3140                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3141                 RETURN(0);
3142         }
3143
3144         if (!set)
3145                 RETURN(-EINVAL);
3146
3147         /* We pass all other commands directly to OST. Since nobody calls osc
3148            methods directly and everybody is supposed to go through LOV, we
3149            assume lov checked invalid values for us.
3150            The only recognised values so far are evict_by_nid and mds_conn.
3151            Even if something bad goes through, we'd get a -EINVAL from OST
3152            anyway. */
3153
3154         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO,
3155                               2, size, bufs);
3156         if (req == NULL)
3157                 RETURN(-ENOMEM);
3158
3159         req->rq_replen = lustre_msg_size(0, NULL);
3160
3161         if (KEY_IS("mds_conn"))
3162                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3163         ptlrpc_set_add_req(set, req);
3164         ptlrpc_check_set(set);
3165
3166         RETURN(0);
3167 }
3168
3169
3170 static struct llog_operations osc_size_repl_logops = {
3171         lop_cancel: llog_obd_repl_cancel
3172 };
3173
3174 static struct llog_operations osc_mds_ost_orig_logops;
3175 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3176                         int count, struct llog_catid *catid)
3177 {
3178         int rc;
3179         ENTRY;
3180
3181         osc_mds_ost_orig_logops = llog_lvfs_ops;
3182         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3183         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3184         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3185         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3186
3187         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3188                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3189         if (rc)
3190                 RETURN(rc);
3191
3192         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3193                         &osc_size_repl_logops);
3194         RETURN(rc);
3195 }
3196
3197 static int osc_llog_finish(struct obd_device *obd, int count)
3198 {
3199         struct llog_ctxt *ctxt;
3200         int rc = 0, rc2 = 0;
3201         ENTRY;
3202
3203         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3204         if (ctxt)
3205                 rc = llog_cleanup(ctxt);
3206
3207         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3208         if (ctxt)
3209                 rc2 = llog_cleanup(ctxt);
3210         if (!rc)
3211                 rc = rc2;
3212
3213         RETURN(rc);
3214 }
3215
3216 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3217                          struct obd_uuid *cluuid,
3218                          struct obd_connect_data *data)
3219 {
3220         struct client_obd *cli = &obd->u.cli;
3221
3222         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3223                 long lost_grant;
3224
3225                 client_obd_list_lock(&cli->cl_loi_list_lock);
3226                 data->ocd_grant = cli->cl_avail_grant ?:
3227                                 2 * cli->cl_max_pages_per_rpc << PAGE_SHIFT;
3228                 lost_grant = cli->cl_lost_grant;
3229                 cli->cl_lost_grant = 0;
3230                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3231
3232                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3233                        "cl_lost_grant: %ld\n", data->ocd_grant,
3234                        cli->cl_avail_grant, lost_grant);
3235                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3236                        " ocd_grant: %d\n", data->ocd_connect_flags,
3237                        data->ocd_version, data->ocd_grant);
3238         }
3239
3240         RETURN(0);
3241 }
3242
3243 static int osc_disconnect(struct obd_export *exp)
3244 {
3245         struct obd_device *obd = class_exp2obd(exp);
3246         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3247         int rc;
3248
3249         if (obd->u.cli.cl_conn_count == 1)
3250                 /* flush any remaining cancel messages out to the target */
3251                 llog_sync(ctxt, exp);
3252
3253         rc = client_disconnect_export(exp);
3254         return rc;
3255 }
3256
3257 static int osc_import_event(struct obd_device *obd,
3258                             struct obd_import *imp,
3259                             enum obd_import_event event)
3260 {
3261         struct client_obd *cli;
3262         int rc = 0;
3263
3264         ENTRY;
3265         LASSERT(imp->imp_obd == obd);
3266
3267         switch (event) {
3268         case IMP_EVENT_DISCON: {
3269                 /* Only do this on the MDS OSC's */
3270                 if (imp->imp_server_timeout) {
3271                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3272
3273                         spin_lock(&oscc->oscc_lock);
3274                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3275                         spin_unlock(&oscc->oscc_lock);
3276                 }
3277
3278                 break;
3279         }
3280         case IMP_EVENT_INACTIVE: {
3281                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3282                 break;
3283         }
3284         case IMP_EVENT_INVALIDATE: {
3285                 struct ldlm_namespace *ns = obd->obd_namespace;
3286
3287                 /* Reset grants */
3288                 cli = &obd->u.cli;
3289                 client_obd_list_lock(&cli->cl_loi_list_lock);
3290                 cli->cl_avail_grant = 0;
3291                 cli->cl_lost_grant = 0;
3292                 /* all pages go to failing rpcs due to the invalid import */
3293                 osc_check_rpcs(cli);
3294                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3295
3296                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3297
3298                 break;
3299         }
3300         case IMP_EVENT_ACTIVE: {
3301                 /* Only do this on the MDS OSC's */
3302                 if (imp->imp_server_timeout) {
3303                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3304
3305                         spin_lock(&oscc->oscc_lock);
3306                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3307                         spin_unlock(&oscc->oscc_lock);
3308                 }
3309                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3310                 break;
3311         }
3312         case IMP_EVENT_OCD: {
3313                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3314
3315                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3316                         osc_init_grant(&obd->u.cli, ocd);
3317
3318                 /* See bug 7198 */
3319                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3320                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3321
3322                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3323                 break;
3324         }
3325         default:
3326                 CERROR("Unknown import event %d\n", event);
3327                 LBUG();
3328         }
3329         RETURN(rc);
3330 }
3331
3332 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3333 {
3334         int rc;
3335         ENTRY;
3336
3337         ENTRY;
3338         rc = ptlrpcd_addref();
3339         if (rc)
3340                 RETURN(rc);
3341
3342         rc = client_obd_setup(obd, lcfg);
3343         if (rc) {
3344                 ptlrpcd_decref();
3345         } else {
3346                 struct lprocfs_static_vars lvars;
3347                 struct client_obd *cli = &obd->u.cli;
3348
3349                 lprocfs_init_vars(osc, &lvars);
3350                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3351                         lproc_osc_attach_seqstat(obd);
3352                         ptlrpc_lprocfs_register_obd(obd);
3353                 }
3354
3355                 oscc_init(obd);
3356                 /* We need to allocate a few requests more, because
3357                    brw_interpret_oap tries to create new requests before freeing
3358                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3359                    reserved, but I afraid that might be too much wasted RAM
3360                    in fact, so 2 is just my guess and still should work. */
3361                 cli->cl_import->imp_rq_pool =
3362                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3363                                             OST_MAXREQSIZE,
3364                                             ptlrpc_add_rqs_to_pool);
3365         }
3366
3367         RETURN(rc);
3368 }
3369
3370 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3371 {
3372         int rc = 0;
3373         ENTRY;
3374
3375         switch (stage) {
3376         case OBD_CLEANUP_EARLY: {
3377                 struct obd_import *imp;
3378                 imp = obd->u.cli.cl_import;
3379                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3380                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3381                 ptlrpc_deactivate_import(imp);
3382                 break;
3383         }
3384         case OBD_CLEANUP_EXPORTS:
3385                 break;
3386         case OBD_CLEANUP_SELF_EXP:
3387                 rc = obd_llog_finish(obd, 0);
3388                 if (rc != 0)
3389                         CERROR("failed to cleanup llogging subsystems\n");
3390                 break;
3391         case OBD_CLEANUP_OBD:
3392                 break;
3393         }
3394         RETURN(rc);
3395 }
3396
3397 int osc_cleanup(struct obd_device *obd)
3398 {
3399         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3400         int rc;
3401
3402         ENTRY;
3403         ptlrpc_lprocfs_unregister_obd(obd);
3404         lprocfs_obd_cleanup(obd);
3405
3406         spin_lock(&oscc->oscc_lock);
3407         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3408         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3409         spin_unlock(&oscc->oscc_lock);
3410
3411         /* free memory of osc quota cache */
3412         lquota_cleanup(quota_interface, obd);
3413
3414         rc = client_obd_cleanup(obd);
3415
3416         ptlrpcd_decref();
3417         RETURN(rc);
3418 }
3419
3420
3421 struct obd_ops osc_obd_ops = {
3422         .o_owner                = THIS_MODULE,
3423         .o_setup                = osc_setup,
3424         .o_precleanup           = osc_precleanup,
3425         .o_cleanup              = osc_cleanup,
3426         .o_add_conn             = client_import_add_conn,
3427         .o_del_conn             = client_import_del_conn,
3428         .o_connect              = client_connect_import,
3429         .o_reconnect            = osc_reconnect,
3430         .o_disconnect           = osc_disconnect,
3431         .o_statfs               = osc_statfs,
3432         .o_packmd               = osc_packmd,
3433         .o_unpackmd             = osc_unpackmd,
3434         .o_create               = osc_create,
3435         .o_destroy              = osc_destroy,
3436         .o_getattr              = osc_getattr,
3437         .o_getattr_async        = osc_getattr_async,
3438         .o_setattr              = osc_setattr,
3439         .o_setattr_async        = osc_setattr_async,
3440         .o_brw                  = osc_brw,
3441         .o_brw_async            = osc_brw_async,
3442         .o_prep_async_page      = osc_prep_async_page,
3443         .o_queue_async_io       = osc_queue_async_io,
3444         .o_set_async_flags      = osc_set_async_flags,
3445         .o_queue_group_io       = osc_queue_group_io,
3446         .o_trigger_group_io     = osc_trigger_group_io,
3447         .o_teardown_async_page  = osc_teardown_async_page,
3448         .o_punch                = osc_punch,
3449         .o_sync                 = osc_sync,
3450         .o_enqueue              = osc_enqueue,
3451         .o_match                = osc_match,
3452         .o_change_cbdata        = osc_change_cbdata,
3453         .o_cancel               = osc_cancel,
3454         .o_cancel_unused        = osc_cancel_unused,
3455         .o_join_lru             = osc_join_lru,
3456         .o_iocontrol            = osc_iocontrol,
3457         .o_get_info             = osc_get_info,
3458         .o_set_info_async       = osc_set_info_async,
3459         .o_import_event         = osc_import_event,
3460         .o_llog_init            = osc_llog_init,
3461         .o_llog_finish          = osc_llog_finish,
3462 };
3463
3464 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3465 struct obd_ops sanosc_obd_ops = {
3466         .o_owner                = THIS_MODULE,
3467         .o_setup                = client_sanobd_setup,
3468         .o_precleanup           = osc_precleanup,
3469         .o_cleanup              = osc_cleanup,
3470         .o_add_conn             = client_import_add_conn,
3471         .o_del_conn             = client_import_del_conn,
3472         .o_connect              = client_connect_import,
3473         .o_reconnect            = osc_reconnect,
3474         .o_disconnect           = client_disconnect_export,
3475         .o_statfs               = osc_statfs,
3476         .o_packmd               = osc_packmd,
3477         .o_unpackmd             = osc_unpackmd,
3478         .o_create               = osc_real_create,
3479         .o_destroy              = osc_destroy,
3480         .o_getattr              = osc_getattr,
3481         .o_getattr_async        = osc_getattr_async,
3482         .o_setattr              = osc_setattr,
3483         .o_brw                  = sanosc_brw,
3484         .o_punch                = osc_punch,
3485         .o_sync                 = osc_sync,
3486         .o_enqueue              = osc_enqueue,
3487         .o_match                = osc_match,
3488         .o_change_cbdata        = osc_change_cbdata,
3489         .o_cancel               = osc_cancel,
3490         .o_cancel_unused        = osc_cancel_unused,
3491         .o_join_lru             = osc_join_lru,
3492         .o_iocontrol            = osc_iocontrol,
3493         .o_import_event         = osc_import_event,
3494         .o_llog_init            = osc_llog_init,
3495         .o_llog_finish          = osc_llog_finish,
3496 };
3497 #endif
3498
3499 extern quota_interface_t osc_quota_interface;
3500
3501 int __init osc_init(void)
3502 {
3503         struct lprocfs_static_vars lvars;
3504 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3505         struct lprocfs_static_vars sanlvars;
3506 #endif
3507         int rc;
3508         ENTRY;
3509
3510         lprocfs_init_vars(osc, &lvars);
3511 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3512         lprocfs_init_vars(osc, &sanlvars);
3513 #endif
3514
3515         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3516         lquota_init(quota_interface);
3517         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3518
3519         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3520                                  LUSTRE_OSC_NAME, NULL);
3521         if (rc) {
3522                 if (quota_interface)
3523                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3524                 RETURN(rc);
3525         }
3526
3527 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3528         rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars,
3529                                  LUSTRE_SANOSC_NAME, NULL);
3530         if (rc) {
3531                 class_unregister_type(LUSTRE_OSC_NAME);
3532                 if (quota_interface)
3533                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3534                 RETURN(rc);
3535         }
3536 #endif
3537
3538         RETURN(rc);
3539 }
3540
3541 #ifdef __KERNEL__
3542 static void /*__exit*/ osc_exit(void)
3543 {
3544         lquota_exit(quota_interface);
3545         if (quota_interface)
3546                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3547
3548 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3549         class_unregister_type(LUSTRE_SANOSC_NAME);
3550 #endif
3551         class_unregister_type(LUSTRE_OSC_NAME);
3552 }
3553
3554 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3555 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3556 MODULE_LICENSE("GPL");
3557
3558 cfs_module(osc, "1.0.0", osc_init, osc_exit);
3559 #endif