Whamcloud - gitweb
c0ae4086b909224f5bab7ddd7d6a2a8aa4161864
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <linux/version.h>
39 # include <linux/module.h>
40 # include <linux/mm.h>
41 # include <linux/highmem.h>
42 # include <linux/ctype.h>
43 # include <linux/init.h>
44 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
45 #  include <linux/workqueue.h>
46 #  include <linux/smp_lock.h>
47 # else
48 #  include <linux/locks.h>
49 # endif
50 #else /* __KERNEL__ */
51 # include <liblustre.h>
52 #endif
53
54 # include <linux/lustre_dlm.h>
55 #include <libcfs/kp30.h>
56 #include <linux/lustre_net.h>
57 #include <lustre/lustre_user.h>
58 #include <linux/obd_ost.h>
59 #include <linux/obd_lov.h>
60
61 #ifdef  __CYGWIN__
62 # include <ctype.h>
63 #endif
64
65 #include <linux/lustre_ha.h>
66 #include <linux/lprocfs_status.h>
67 #include <linux/lustre_log.h>
68 #include <linux/lustre_debug.h>
69 #include "osc_internal.h"
70
71 /* Pack OSC object metadata for disk storage (LE byte order). */
72 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
73                       struct lov_stripe_md *lsm)
74 {
75         int lmm_size;
76         ENTRY;
77
78         lmm_size = sizeof(**lmmp);
79         if (!lmmp)
80                 RETURN(lmm_size);
81
82         if (*lmmp && !lsm) {
83                 OBD_FREE(*lmmp, lmm_size);
84                 *lmmp = NULL;
85                 RETURN(0);
86         }
87
88         if (!*lmmp) {
89                 OBD_ALLOC(*lmmp, lmm_size);
90                 if (!*lmmp)
91                         RETURN(-ENOMEM);
92         }
93
94         if (lsm) {
95                 LASSERT(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE(*lsmp, lsm_size);
129                 *lsmp = NULL;
130                 RETURN(0);
131         }
132
133         if (*lsmp == NULL) {
134                 OBD_ALLOC(*lsmp, lsm_size);
135                 if (*lsmp == NULL)
136                         RETURN(-ENOMEM);
137                 loi_init((*lsmp)->lsm_oinfo);
138         }
139
140         if (lmm != NULL) {
141                 /* XXX zero *lsmp? */
142                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
143                 LASSERT((*lsmp)->lsm_object_id);
144         }
145
146         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
147
148         RETURN(lsm_size);
149 }
150
151 static int osc_getattr_interpret(struct ptlrpc_request *req,
152                                  struct osc_getattr_async_args *aa, int rc)
153 {
154         struct ost_body *body;
155         ENTRY;
156
157         if (rc != 0)
158                 RETURN(rc);
159
160         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
161         if (body) {
162                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
163                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
164
165                 /* This should really be sent by the OST */
166                 aa->aa_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
167                 aa->aa_oa->o_valid |= OBD_MD_FLBLKSZ;
168         } else {
169                 CERROR("can't unpack ost_body\n");
170                 rc = -EPROTO;
171                 aa->aa_oa->o_valid = 0;
172         }
173
174         RETURN(rc);
175 }
176
177 static int osc_getattr_async(struct obd_export *exp, struct obdo *oa,
178                              struct lov_stripe_md *md,
179                              struct ptlrpc_request_set *set)
180 {
181         struct ptlrpc_request *request;
182         struct ost_body *body;
183         int size = sizeof(*body);
184         struct osc_getattr_async_args *aa;
185         ENTRY;
186
187         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
188                                   OST_GETATTR, 1, &size, NULL);
189         if (!request)
190                 RETURN(-ENOMEM);
191
192         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
193         memcpy(&body->oa, oa, sizeof(*oa));
194
195         request->rq_replen = lustre_msg_size(1, &size);
196         request->rq_interpret_reply = osc_getattr_interpret;
197
198         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
199         aa = (struct osc_getattr_async_args *)&request->rq_async_args;
200         aa->aa_oa = oa;
201
202         ptlrpc_set_add_req (set, request);
203         RETURN (0);
204 }
205
206 static int osc_getattr(struct obd_export *exp, struct obdo *oa,
207                        struct lov_stripe_md *md)
208 {
209         struct ptlrpc_request *request;
210         struct ost_body *body;
211         int rc, size = sizeof(*body);
212         ENTRY;
213
214         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
215                                   OST_GETATTR, 1, &size, NULL);
216         if (!request)
217                 RETURN(-ENOMEM);
218
219         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
220         memcpy(&body->oa, oa, sizeof(*oa));
221
222         request->rq_replen = lustre_msg_size(1, &size);
223
224         rc = ptlrpc_queue_wait(request);
225         if (rc) {
226                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
227                 GOTO(out, rc);
228         }
229
230         body = lustre_swab_repbuf(request, 0, sizeof (*body),
231                                   lustre_swab_ost_body);
232         if (body == NULL) {
233                 CERROR ("can't unpack ost_body\n");
234                 GOTO (out, rc = -EPROTO);
235         }
236
237         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
238         memcpy(oa, &body->oa, sizeof(*oa));
239
240         /* This should really be sent by the OST */
241         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
242         oa->o_valid |= OBD_MD_FLBLKSZ;
243
244         EXIT;
245  out:
246         ptlrpc_req_finished(request);
247         return rc;
248 }
249
250 static int osc_setattr(struct obd_export *exp, struct obdo *oa,
251                        struct lov_stripe_md *md, struct obd_trans_info *oti)
252 {
253         struct ptlrpc_request *request;
254         struct ost_body *body;
255         int rc, size = sizeof(*body);
256         ENTRY;
257
258         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
259                                   OST_SETATTR, 1, &size, NULL);
260         if (!request)
261                 RETURN(-ENOMEM);
262
263         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
264         memcpy(&body->oa, oa, sizeof(*oa));
265
266         request->rq_replen = lustre_msg_size(1, &size);
267
268         rc = ptlrpc_queue_wait(request);
269         if (rc)
270                 GOTO(out, rc);
271
272         body = lustre_swab_repbuf(request, 0, sizeof(*body),
273                                   lustre_swab_ost_body);
274         if (body == NULL)
275                 GOTO(out, rc = -EPROTO);
276
277         memcpy(oa, &body->oa, sizeof(*oa));
278
279         EXIT;
280 out:
281         ptlrpc_req_finished(request);
282         RETURN(0);
283 }
284
285 static int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
286                              struct lov_stripe_md *md,
287                              struct obd_trans_info *oti)
288 {
289         struct ptlrpc_request *request;
290         struct ost_body *body;
291         int rc = 0, size = sizeof(*body);
292         ENTRY;
293
294         LASSERT(oti);
295
296         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
297                                   OST_SETATTR, 1, &size, NULL);
298         if (!request)
299                 RETURN(-ENOMEM);
300
301         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
302
303         if (oa->o_valid & OBD_MD_FLCOOKIE)
304                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
305                        sizeof(*oti->oti_logcookies));
306
307         memcpy(&body->oa, oa, sizeof(*oa));
308         request->rq_replen = lustre_msg_size(1, &size);
309         /* do mds to ost setattr asynchronouly */
310         ptlrpcd_add_req(request);
311
312         RETURN(rc);
313 }
314
315 int osc_real_create(struct obd_export *exp, struct obdo *oa,
316                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
317 {
318         struct ptlrpc_request *request;
319         struct ost_body *body;
320         struct lov_stripe_md *lsm;
321         int rc, size = sizeof(*body);
322         ENTRY;
323
324         LASSERT(oa);
325         LASSERT(ea);
326
327         lsm = *ea;
328         if (!lsm) {
329                 rc = obd_alloc_memmd(exp, &lsm);
330                 if (rc < 0)
331                         RETURN(rc);
332         }
333
334         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
335                                   OST_CREATE, 1, &size, NULL);
336         if (!request)
337                 GOTO(out, rc = -ENOMEM);
338
339         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
340         memcpy(&body->oa, oa, sizeof(body->oa));
341
342         request->rq_replen = lustre_msg_size(1, &size);
343         if (oa->o_valid & OBD_MD_FLINLINE) {
344                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
345                         oa->o_flags == OBD_FL_DELORPHAN);
346                 DEBUG_REQ(D_HA, request,
347                           "delorphan from OST integration");
348                 /* Don't resend the delorphan request */
349                 request->rq_no_resend = request->rq_no_delay = 1;
350         }
351
352         rc = ptlrpc_queue_wait(request);
353         if (rc)
354                 GOTO(out_req, rc);
355
356         body = lustre_swab_repbuf(request, 0, sizeof(*body),
357                                   lustre_swab_ost_body);
358         if (body == NULL) {
359                 CERROR ("can't unpack ost_body\n");
360                 GOTO (out_req, rc = -EPROTO);
361         }
362
363         memcpy(oa, &body->oa, sizeof(*oa));
364
365         /* This should really be sent by the OST */
366         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
367         oa->o_valid |= OBD_MD_FLBLKSZ;
368
369         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
370          * have valid lsm_oinfo data structs, so don't go touching that.
371          * This needs to be fixed in a big way.
372          */
373         lsm->lsm_object_id = oa->o_id;
374         *ea = lsm;
375
376         if (oti != NULL) {
377                 oti->oti_transno = request->rq_repmsg->transno;
378
379                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
380                         if (!oti->oti_logcookies)
381                                 oti_alloc_cookies(oti, 1);
382                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
383                                sizeof(oti->oti_onecookie));
384                 }
385         }
386
387         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
388         EXIT;
389 out_req:
390         ptlrpc_req_finished(request);
391 out:
392         if (rc && !*ea)
393                 obd_free_memmd(exp, &lsm);
394         return rc;
395 }
396
397 static int osc_punch(struct obd_export *exp, struct obdo *oa,
398                      struct lov_stripe_md *md, obd_size start,
399                      obd_size end, struct obd_trans_info *oti)
400 {
401         struct ptlrpc_request *request;
402         struct ost_body *body;
403         int rc, size = sizeof(*body);
404         ENTRY;
405
406         if (!oa) {
407                 CERROR("oa NULL\n");
408                 RETURN(-EINVAL);
409         }
410
411         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
412                                   OST_PUNCH, 1, &size, NULL);
413         if (!request)
414                 RETURN(-ENOMEM);
415
416         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
417         memcpy(&body->oa, oa, sizeof(*oa));
418
419         /* overload the size and blocks fields in the oa with start/end */
420         body->oa.o_size = start;
421         body->oa.o_blocks = end;
422         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
423
424         request->rq_replen = lustre_msg_size(1, &size);
425
426         rc = ptlrpc_queue_wait(request);
427         if (rc)
428                 GOTO(out, rc);
429
430         body = lustre_swab_repbuf (request, 0, sizeof (*body),
431                                    lustre_swab_ost_body);
432         if (body == NULL) {
433                 CERROR ("can't unpack ost_body\n");
434                 GOTO (out, rc = -EPROTO);
435         }
436
437         memcpy(oa, &body->oa, sizeof(*oa));
438
439         EXIT;
440  out:
441         ptlrpc_req_finished(request);
442         return rc;
443 }
444
445 static int osc_sync(struct obd_export *exp, struct obdo *oa,
446                     struct lov_stripe_md *md, obd_size start, obd_size end)
447 {
448         struct ptlrpc_request *request;
449         struct ost_body *body;
450         int rc, size = sizeof(*body);
451         ENTRY;
452
453         if (!oa) {
454                 CERROR("oa NULL\n");
455                 RETURN(-EINVAL);
456         }
457
458         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
459                                   OST_SYNC, 1, &size, NULL);
460         if (!request)
461                 RETURN(-ENOMEM);
462
463         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
464         memcpy(&body->oa, oa, sizeof(*oa));
465
466         /* overload the size and blocks fields in the oa with start/end */
467         body->oa.o_size = start;
468         body->oa.o_blocks = end;
469         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
470
471         request->rq_replen = lustre_msg_size(1, &size);
472
473         rc = ptlrpc_queue_wait(request);
474         if (rc)
475                 GOTO(out, rc);
476
477         body = lustre_swab_repbuf(request, 0, sizeof(*body),
478                                   lustre_swab_ost_body);
479         if (body == NULL) {
480                 CERROR ("can't unpack ost_body\n");
481                 GOTO (out, rc = -EPROTO);
482         }
483
484         memcpy(oa, &body->oa, sizeof(*oa));
485
486         EXIT;
487  out:
488         ptlrpc_req_finished(request);
489         return rc;
490 }
491
492 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
493                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
494                        struct obd_export *md_export)
495 {
496         struct ptlrpc_request *request;
497         struct ost_body *body;
498         int rc, size = sizeof(*body);
499         ENTRY;
500
501         if (!oa) {
502                 CERROR("oa NULL\n");
503                 RETURN(-EINVAL);
504         }
505
506         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
507                                   OST_DESTROY, 1, &size, NULL);
508         if (!request)
509                 RETURN(-ENOMEM);
510
511         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
512
513         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
514                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
515                        sizeof(*oti->oti_logcookies));
516                 oti->oti_logcookies++;
517         }
518
519         memcpy(&body->oa, oa, sizeof(*oa));
520         request->rq_replen = lustre_msg_size(1, &size);
521
522         rc = ptlrpc_queue_wait(request);
523         if (rc == -ENOENT)
524                 rc = 0;
525         if (rc)
526                 GOTO(out, rc);
527
528         body = lustre_swab_repbuf(request, 0, sizeof(*body),
529                                   lustre_swab_ost_body);
530         if (body == NULL) {
531                 CERROR ("Can't unpack body\n");
532                 GOTO (out, rc = -EPROTO);
533         }
534
535         memcpy(oa, &body->oa, sizeof(*oa));
536
537         EXIT;
538  out:
539         ptlrpc_req_finished(request);
540         return rc;
541 }
542
543 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
544                                 long writing_bytes)
545 {
546         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
547
548         LASSERT(!(oa->o_valid & bits));
549
550         oa->o_valid |= bits;
551         spin_lock(&cli->cl_loi_list_lock);
552         oa->o_dirty = cli->cl_dirty;
553         if (cli->cl_dirty > cli->cl_dirty_max) {
554                 CERROR("dirty %lu > dirty_max %lu\n",
555                        cli->cl_dirty, cli->cl_dirty_max);
556                 oa->o_undirty = 0;
557         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
558                 CERROR("dirty %lu - dirty_max %lu too big???\n",
559                        cli->cl_dirty, cli->cl_dirty_max);
560                 oa->o_undirty = 0;
561         } else {
562                 long max_in_flight = (cli->cl_max_pages_per_rpc << PAGE_SHIFT)*
563                                 (cli->cl_max_rpcs_in_flight + 1);
564                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
565         }
566         oa->o_grant = cli->cl_avail_grant;
567         oa->o_dropped = cli->cl_lost_grant;
568         cli->cl_lost_grant = 0;
569         spin_unlock(&cli->cl_loi_list_lock);
570         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
571                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
572 }
573
574 /* caller must hold loi_list_lock */
575 static void osc_consume_write_grant(struct client_obd *cli,
576                                     struct osc_async_page *oap)
577 {
578         cli->cl_dirty += PAGE_SIZE;
579         cli->cl_avail_grant -= PAGE_SIZE;
580         oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
581         CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
582         LASSERT(cli->cl_avail_grant >= 0);
583 }
584
585 static unsigned long rpcs_in_flight(struct client_obd *cli)
586 {
587         return cli->cl_r_in_flight + cli->cl_w_in_flight;
588 }
589
590 /* caller must hold loi_list_lock */
591 void osc_wake_cache_waiters(struct client_obd *cli)
592 {
593         struct list_head *l, *tmp;
594         struct osc_cache_waiter *ocw;
595
596         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
597                 /* if we can't dirty more, we must wait until some is written */
598                 if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
599                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
600                                cli->cl_dirty, cli->cl_dirty_max);
601                         return;
602                 }
603
604                 /* if still dirty cache but no grant wait for pending RPCs that
605                  * may yet return us some grant before doing sync writes */
606                 if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
607                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
608                                cli->cl_w_in_flight);
609                         return;
610                 }
611
612                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
613                 list_del_init(&ocw->ocw_entry);
614                 if (cli->cl_avail_grant < PAGE_SIZE) {
615                         /* no more RPCs in flight to return grant, do sync IO */
616                         ocw->ocw_rc = -EDQUOT;
617                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
618                 } else {
619                         osc_consume_write_grant(cli, ocw->ocw_oap);
620                 }
621
622                 wake_up(&ocw->ocw_waitq);
623         }
624
625         EXIT;
626 }
627
628 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
629 {
630         spin_lock(&cli->cl_loi_list_lock);
631         cli->cl_avail_grant = ocd->ocd_grant;
632         spin_unlock(&cli->cl_loi_list_lock);
633
634         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
635                cli->cl_avail_grant, cli->cl_lost_grant);
636         LASSERT(cli->cl_avail_grant >= 0);
637 }
638
639 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
640 {
641         spin_lock(&cli->cl_loi_list_lock);
642         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
643         cli->cl_avail_grant += body->oa.o_grant;
644         /* waiters are woken in brw_interpret_oap */
645         spin_unlock(&cli->cl_loi_list_lock);
646 }
647
648 /* We assume that the reason this OSC got a short read is because it read
649  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
650  * via the LOV, and it _knows_ it's reading inside the file, it's just that
651  * this stripe never got written at or beyond this stripe offset yet. */
652 static void handle_short_read(int nob_read, obd_count page_count,
653                               struct brw_page *pga)
654 {
655         char *ptr;
656
657         /* skip bytes read OK */
658         while (nob_read > 0) {
659                 LASSERT (page_count > 0);
660
661                 if (pga->count > nob_read) {
662                         /* EOF inside this page */
663                         ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
664                         memset(ptr + nob_read, 0, pga->count - nob_read);
665                         kunmap(pga->pg);
666                         page_count--;
667                         pga++;
668                         break;
669                 }
670
671                 nob_read -= pga->count;
672                 page_count--;
673                 pga++;
674         }
675
676         /* zero remaining pages */
677         while (page_count-- > 0) {
678                 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
679                 memset(ptr, 0, pga->count);
680                 kunmap(pga->pg);
681                 pga++;
682         }
683 }
684
685 static int check_write_rcs(struct ptlrpc_request *request,
686                            int requested_nob, int niocount,
687                            obd_count page_count, struct brw_page *pga)
688 {
689         int    *remote_rcs, i;
690
691         /* return error if any niobuf was in error */
692         remote_rcs = lustre_swab_repbuf(request, 1,
693                                         sizeof(*remote_rcs) * niocount, NULL);
694         if (remote_rcs == NULL) {
695                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
696                 return(-EPROTO);
697         }
698         if (lustre_msg_swabbed(request->rq_repmsg))
699                 for (i = 0; i < niocount; i++)
700                         __swab32s(&remote_rcs[i]);
701
702         for (i = 0; i < niocount; i++) {
703                 if (remote_rcs[i] < 0)
704                         return(remote_rcs[i]);
705
706                 if (remote_rcs[i] != 0) {
707                         CERROR("rc[%d] invalid (%d) req %p\n",
708                                 i, remote_rcs[i], request);
709                         return(-EPROTO);
710                 }
711         }
712
713         if (request->rq_bulk->bd_nob_transferred != requested_nob) {
714                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
715                        requested_nob, request->rq_bulk->bd_nob_transferred);
716                 return(-EPROTO);
717         }
718
719         return (0);
720 }
721
722 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
723 {
724         if (p1->flag != p2->flag) {
725                 unsigned mask = ~OBD_BRW_FROM_GRANT;
726
727                 /* warn if we try to combine flags that we don't know to be
728                  * safe to combine */
729                 if ((p1->flag & mask) != (p2->flag & mask))
730                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
731                                "same brw?\n", p1->flag, p2->flag);
732                 return 0;
733         }
734
735         return (p1->off + p1->count == p2->off);
736 }
737
738 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
739                                    struct brw_page *pga)
740 {
741         __u32 cksum = ~0;
742
743         LASSERT (pg_count > 0);
744         while (nob > 0 && pg_count > 0) {
745                 char *ptr = kmap(pga->pg);
746                 int off = pga->off & ~PAGE_MASK;
747                 int count = pga->count > nob ? nob : pga->count;
748
749                 cksum = crc32_le(cksum, ptr + off, count);
750                 kunmap(pga->pg);
751                 LL_CDEBUG_PAGE(D_PAGE, pga->pg, "off %d checksum %x\n",
752                                off, cksum);
753
754                 nob -= pga->count;
755                 pg_count--;
756                 pga++;
757         }
758
759         return cksum;
760 }
761
762 static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
763                                 struct lov_stripe_md *lsm, obd_count page_count,
764                                 struct brw_page *pga, int *requested_nobp,
765                                 int *niocountp, struct ptlrpc_request **reqp)
766 {
767         struct ptlrpc_request   *req;
768         struct ptlrpc_bulk_desc *desc;
769         struct client_obd       *cli = &imp->imp_obd->u.cli;
770         struct ost_body         *body;
771         struct obd_ioobj        *ioobj;
772         struct niobuf_remote    *niobuf;
773         int                      niocount;
774         int                      size[3];
775         int                      i;
776         int                      requested_nob;
777         int                      opc;
778         int                      rc;
779         struct ptlrpc_request_pool *pool;
780
781         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
782         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_rq_pool : NULL;
783
784         for (niocount = i = 1; i < page_count; i++)
785                 if (!can_merge_pages(&pga[i - 1], &pga[i]))
786                         niocount++;
787
788         size[0] = sizeof(*body);
789         size[1] = sizeof(*ioobj);
790         size[2] = niocount * sizeof(*niobuf);
791
792         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
793         req = ptlrpc_prep_req_pool(imp, LUSTRE_OST_VERSION, opc, 3,
794                                    size, NULL, pool);
795         if (req == NULL)
796                 return (-ENOMEM);
797
798         /* FIXME bug 249. Also see bug 7198 */
799         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
800                 req->rq_request_portal = OST_IO_PORTAL;
801
802         if (opc == OST_WRITE)
803                 desc = ptlrpc_prep_bulk_imp (req, page_count,
804                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
805         else
806                 desc = ptlrpc_prep_bulk_imp (req, page_count,
807                                              BULK_PUT_SINK, OST_BULK_PORTAL);
808         if (desc == NULL)
809                 GOTO(out, rc = -ENOMEM);
810         /* NB request now owns desc and will free it when it gets freed */
811
812         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
813         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
814         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
815
816         memcpy(&body->oa, oa, sizeof(*oa));
817
818         obdo_to_ioobj(oa, ioobj);
819         ioobj->ioo_bufcnt = niocount;
820
821         LASSERT (page_count > 0);
822         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
823                 struct brw_page *pg = &pga[i];
824                 struct brw_page *pg_prev = pg - 1;
825
826                 LASSERT(pg->count > 0);
827                 LASSERTF((pg->off & ~PAGE_MASK) + pg->count <= PAGE_SIZE,
828                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
829                          pg->off, pg->count);
830                 LASSERTF(i == 0 || pg->off > pg_prev->off,
831                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
832                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
833                          i, page_count,
834                          pg->pg, pg->pg->private, pg->pg->index, pg->off,
835                          pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
836                                  pg_prev->off);
837                 LASSERT((pga[0].flag & OBD_BRW_SRVLOCK) ==
838                         (pg->flag & OBD_BRW_SRVLOCK));
839
840                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~PAGE_MASK,
841                                       pg->count);
842                 requested_nob += pg->count;
843
844                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
845                         niobuf--;
846                         niobuf->len += pg->count;
847                 } else {
848                         niobuf->offset = pg->off;
849                         niobuf->len    = pg->count;
850                         niobuf->flags  = pg->flag;
851                 }
852         }
853
854         LASSERT((void *)(niobuf - niocount) ==
855                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
856         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
857
858         /* size[0] still sizeof (*body) */
859         if (opc == OST_WRITE) {
860                 if (unlikely(cli->cl_checksum)) {
861                         body->oa.o_valid |= OBD_MD_FLCKSUM;
862                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
863                                                              page_count, pga);
864                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
865                                body->oa.o_cksum);
866                         /* save this in 'oa', too, for later checking */
867                         oa->o_valid |= OBD_MD_FLCKSUM;
868                         oa->o_cksum = body->oa.o_cksum;
869                 }
870                 /* 1 RC per niobuf */
871                 size[1] = sizeof(__u32) * niocount;
872                 req->rq_replen = lustre_msg_size(2, size);
873         } else {
874                 if (unlikely(cli->cl_checksum))
875                         body->oa.o_valid |= OBD_MD_FLCKSUM;
876                 /* 1 RC for the whole I/O */
877                 req->rq_replen = lustre_msg_size(1, size);
878         }
879
880         *niocountp = niocount;
881         *requested_nobp = requested_nob;
882         *reqp = req;
883         return (0);
884
885  out:
886         ptlrpc_req_finished (req);
887         return (rc);
888 }
889
890 static void check_write_csum(__u32 cli, __u32 srv, int requested_nob,
891                              obd_count page_count, struct brw_page *pga)
892 {
893         __u32 new_csum;
894
895         if (srv == cli) {
896                 CDEBUG(D_PAGE, "checksum %x confirmed\n", cli);
897                 return;
898         }
899
900         new_csum = osc_checksum_bulk(requested_nob, page_count, pga);
901
902         if (new_csum == srv) {
903                 CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client"
904                        "after we checksummed them (original client csum:"
905                        " %x; server csum: %x; client csum now: %x)\n",
906                        cli, srv, new_csum);
907                 return;
908         }
909
910         if (new_csum == cli) {
911                 CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit "
912                        "(original client csum: %x; server csum: %x; client "
913                        "csum now: %x)\n", cli, srv, new_csum);
914                 return;
915         }
916
917         CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the "
918                "current page contents don't match the originals OR what the "
919                "server received (original client csum: %x; server csum: %x; "
920                "client csum now: %x)\n", cli, srv, new_csum);
921 }
922
923 static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa,
924                                 int requested_nob, int niocount,
925                                 obd_count page_count, struct brw_page *pga,
926                                 int rc)
927 {
928         const lnet_process_id_t *peer =
929                         &req->rq_import->imp_connection->c_peer;
930         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
931         struct ost_body *body;
932         __u32 client_cksum = 0;
933         ENTRY;
934
935         if (rc < 0 && rc != -EDQUOT)
936                 RETURN(rc);
937
938         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
939         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
940         if (body == NULL) {
941                 CERROR ("Can't unpack body\n");
942                 RETURN(-EPROTO);
943         }
944
945         /* set/clear over quota flag for a uid/gid */
946         if (req->rq_reqmsg->opc == OST_WRITE &&
947             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
948                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
949                              body->oa.o_gid, body->oa.o_valid,
950                              body->oa.o_flags);
951
952         if (rc < 0)
953                 RETURN(rc);
954
955         if (unlikely(oa->o_valid & OBD_MD_FLCKSUM))
956                 client_cksum = oa->o_cksum; /* save for later */
957
958         osc_update_grant(cli, body);
959         memcpy(oa, &body->oa, sizeof(*oa));
960
961         if (req->rq_reqmsg->opc == OST_WRITE) {
962                 if (rc > 0) {
963                         CERROR ("Unexpected +ve rc %d\n", rc);
964                         RETURN(-EPROTO);
965                 }
966                 LASSERT (req->rq_bulk->bd_nob == requested_nob);
967
968                 if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) &&
969                              client_cksum)) {
970                         check_write_csum(client_cksum, oa->o_cksum,
971                                          requested_nob, page_count, pga);
972                 }
973
974                 RETURN(check_write_rcs(req, requested_nob, niocount,
975                                        page_count, pga));
976         }
977
978         /* The rest of this function executes only for OST_READs */
979         if (rc > requested_nob) {
980                 CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
981                 RETURN(-EPROTO);
982         }
983
984         if (rc != req->rq_bulk->bd_nob_transferred) {
985                 CERROR ("Unexpected rc %d (%d transferred)\n",
986                         rc, req->rq_bulk->bd_nob_transferred);
987                 return (-EPROTO);
988         }
989
990         if (rc < requested_nob)
991                 handle_short_read(rc, page_count, pga);
992
993         if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) {
994                 static int cksum_counter;
995                 __u32 cksum = osc_checksum_bulk(rc, page_count, pga);
996                 __u32 server_cksum = oa->o_cksum;
997
998                 if (server_cksum == ~0 && rc > 0) {
999                         CERROR("Protocol error: server %s set the 'checksum' "
1000                                "bit, but didn't send a checksum.  Not fatal, "
1001                                "but please tell CFS.\n",
1002                                libcfs_nid2str(peer->nid));
1003                         RETURN(0);
1004                 }
1005
1006                 cksum_counter++;
1007
1008                 if (server_cksum != cksum) {
1009                         CERROR("Bad checksum from %s: server %x != client %x\n",
1010                                libcfs_nid2str(peer->nid), server_cksum, cksum);
1011                         cksum_counter = 0;
1012                         oa->o_cksum = cksum;
1013                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1014                         CWARN("Checksum %u from %s OK: %x\n",
1015                               cksum_counter, libcfs_nid2str(peer->nid), cksum);
1016                 }
1017                 CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum);
1018         } else if (unlikely(client_cksum)) {
1019                 static int cksum_missed;
1020
1021                 cksum_missed++;
1022                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1023                         CERROR("Checksum %u requested from %s but not sent\n",
1024                                cksum_missed, libcfs_nid2str(peer->nid));
1025         }
1026
1027         RETURN(0);
1028 }
1029
1030 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1031                             struct lov_stripe_md *lsm,
1032                             obd_count page_count, struct brw_page *pga)
1033 {
1034         int                    requested_nob;
1035         int                    niocount;
1036         struct ptlrpc_request *request;
1037         int                    rc;
1038         ENTRY;
1039
1040 restart_bulk:
1041         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1042                                   page_count, pga, &requested_nob, &niocount,
1043                                   &request);
1044         if (rc != 0)
1045                 return (rc);
1046
1047         rc = ptlrpc_queue_wait(request);
1048
1049         if (rc == -ETIMEDOUT && request->rq_resend) {
1050                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1051                 ptlrpc_req_finished(request);
1052                 goto restart_bulk;
1053         }
1054
1055         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1056                                   page_count, pga, rc);
1057
1058         ptlrpc_req_finished(request);
1059         RETURN (rc);
1060 }
1061
1062 static int brw_interpret(struct ptlrpc_request *request,
1063                          struct osc_brw_async_args *aa, int rc)
1064 {
1065         struct obdo *oa      = aa->aa_oa;
1066         int requested_nob    = aa->aa_requested_nob;
1067         int niocount         = aa->aa_nio_count;
1068         obd_count page_count = aa->aa_page_count;
1069         struct brw_page *pga = aa->aa_pga;
1070         ENTRY;
1071
1072         rc = osc_brw_fini_request(request, oa, requested_nob, niocount,
1073                                   page_count, pga, rc);
1074         RETURN (rc);
1075 }
1076
1077 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1078                           struct lov_stripe_md *lsm, obd_count page_count,
1079                           struct brw_page *pga, struct ptlrpc_request_set *set)
1080 {
1081         struct ptlrpc_request     *request;
1082         int                        requested_nob;
1083         int                        nio_count;
1084         struct osc_brw_async_args *aa;
1085         int                        rc;
1086         ENTRY;
1087
1088         rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm,
1089                                   page_count, pga, &requested_nob, &nio_count,
1090                                   &request);
1091
1092         if (rc == 0) {
1093                 LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1094                 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1095                 aa->aa_oa = oa;
1096                 aa->aa_requested_nob = requested_nob;
1097                 aa->aa_nio_count = nio_count;
1098                 aa->aa_page_count = page_count;
1099                 aa->aa_pga = pga;
1100
1101                 request->rq_interpret_reply = brw_interpret;
1102                 ptlrpc_set_add_req(set, request);
1103         }
1104         RETURN (rc);
1105 }
1106
1107 #ifndef min_t
1108 #define min_t(type,x,y) \
1109         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1110 #endif
1111
1112 /*
1113  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1114  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1115  * fine for our small page arrays and doesn't require allocation.  its an
1116  * insertion sort that swaps elements that are strides apart, shrinking the
1117  * stride down until its '1' and the array is sorted.
1118  */
1119 static void sort_brw_pages(struct brw_page *array, int num)
1120 {
1121         int stride, i, j;
1122         struct brw_page tmp;
1123
1124         if (num == 1)
1125                 return;
1126         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1127                 ;
1128
1129         do {
1130                 stride /= 3;
1131                 for (i = stride ; i < num ; i++) {
1132                         tmp = array[i];
1133                         j = i;
1134                         while (j >= stride && array[j - stride].off > tmp.off) {
1135                                 array[j] = array[j - stride];
1136                                 j -= stride;
1137                         }
1138                         array[j] = tmp;
1139                 }
1140         } while (stride > 1);
1141 }
1142
1143 static obd_count max_unfragmented_pages(struct brw_page *pg, obd_count pages)
1144 {
1145         int count = 1;
1146         int offset;
1147
1148         LASSERT (pages > 0);
1149         offset = pg->off & (PAGE_SIZE - 1);
1150
1151         for (;;) {
1152                 pages--;
1153                 if (pages == 0)         /* that's all */
1154                         return count;
1155
1156                 if (offset + pg->count < PAGE_SIZE)
1157                         return count;   /* doesn't end on page boundary */
1158
1159                 pg++;
1160                 offset = pg->off & (PAGE_SIZE - 1);
1161                 if (offset != 0)        /* doesn't start on page boundary */
1162                         return count;
1163
1164                 count++;
1165         }
1166 }
1167
1168 static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
1169                    struct lov_stripe_md *md, obd_count page_count,
1170                    struct brw_page *pga, struct obd_trans_info *oti)
1171 {
1172         ENTRY;
1173
1174         if (cmd & OBD_BRW_CHECK) {
1175                 /* The caller just wants to know if there's a chance that this
1176                  * I/O can succeed */
1177                 struct obd_import *imp = class_exp2cliimp(exp);
1178
1179                 if (imp == NULL || imp->imp_invalid)
1180                         RETURN(-EIO);
1181                 RETURN(0);
1182         }
1183
1184         while (page_count) {
1185                 obd_count pages_per_brw;
1186                 int rc;
1187
1188                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1189                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1190                 else
1191                         pages_per_brw = page_count;
1192
1193                 sort_brw_pages(pga, pages_per_brw);
1194                 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1195
1196                 rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga);
1197
1198                 if (rc != 0)
1199                         RETURN(rc);
1200
1201                 page_count -= pages_per_brw;
1202                 pga += pages_per_brw;
1203         }
1204         RETURN(0);
1205 }
1206
1207 static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
1208                          struct lov_stripe_md *md, obd_count page_count,
1209                          struct brw_page *pga, struct ptlrpc_request_set *set,
1210                          struct obd_trans_info *oti)
1211 {
1212         ENTRY;
1213
1214         if (cmd & OBD_BRW_CHECK) {
1215                 /* The caller just wants to know if there's a chance that this
1216                  * I/O can succeed */
1217                 struct obd_import *imp = class_exp2cliimp(exp);
1218
1219                 if (imp == NULL || imp->imp_invalid)
1220                         RETURN(-EIO);
1221                 RETURN(0);
1222         }
1223
1224         while (page_count) {
1225                 obd_count pages_per_brw;
1226                 int rc;
1227
1228                 if (page_count > PTLRPC_MAX_BRW_PAGES)
1229                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
1230                 else
1231                         pages_per_brw = page_count;
1232
1233                 sort_brw_pages(pga, pages_per_brw);
1234                 pages_per_brw = max_unfragmented_pages(pga, pages_per_brw);
1235
1236                 rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set);
1237
1238                 if (rc != 0)
1239                         RETURN(rc);
1240
1241                 page_count -= pages_per_brw;
1242                 pga += pages_per_brw;
1243         }
1244         RETURN(0);
1245 }
1246
1247 static void osc_check_rpcs(struct client_obd *cli);
1248 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1249                            int sent);
1250
1251 /* This maintains the lists of pending pages to read/write for a given object
1252  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1253  * to quickly find objects that are ready to send an RPC. */
1254 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1255                          int cmd)
1256 {
1257         int optimal;
1258         ENTRY;
1259
1260         if (lop->lop_num_pending == 0)
1261                 RETURN(0);
1262
1263         /* if we have an invalid import we want to drain the queued pages
1264          * by forcing them through rpcs that immediately fail and complete
1265          * the pages.  recovery relies on this to empty the queued pages
1266          * before canceling the locks and evicting down the llite pages */
1267         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1268                 RETURN(1);
1269
1270         /* stream rpcs in queue order as long as as there is an urgent page
1271          * queued.  this is our cheap solution for good batching in the case
1272          * where writepage marks some random page in the middle of the file
1273          * as urgent because of, say, memory pressure */
1274         if (!list_empty(&lop->lop_urgent))
1275                 RETURN(1);
1276
1277         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1278         optimal = cli->cl_max_pages_per_rpc;
1279         if (cmd & OBD_BRW_WRITE) {
1280                 /* trigger a write rpc stream as long as there are dirtiers
1281                  * waiting for space.  as they're waiting, they're not going to
1282                  * create more pages to coallesce with what's waiting.. */
1283                 if (!list_empty(&cli->cl_cache_waiters))
1284                         RETURN(1);
1285
1286                 /* +16 to avoid triggering rpcs that would want to include pages
1287                  * that are being queued but which can't be made ready until
1288                  * the queuer finishes with the page. this is a wart for
1289                  * llite::commit_write() */
1290                 optimal += 16;
1291         }
1292         if (lop->lop_num_pending >= optimal)
1293                 RETURN(1);
1294
1295         RETURN(0);
1296 }
1297
1298 static void on_list(struct list_head *item, struct list_head *list,
1299                     int should_be_on)
1300 {
1301         if (list_empty(item) && should_be_on)
1302                 list_add_tail(item, list);
1303         else if (!list_empty(item) && !should_be_on)
1304                 list_del_init(item);
1305 }
1306
1307 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1308  * can find pages to build into rpcs quickly */
1309 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1310 {
1311         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1312                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1313                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1314
1315         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1316                 loi->loi_write_lop.lop_num_pending);
1317
1318         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1319                 loi->loi_read_lop.lop_num_pending);
1320 }
1321
1322 static void lop_update_pending(struct client_obd *cli,
1323                                struct loi_oap_pages *lop, int cmd, int delta)
1324 {
1325         lop->lop_num_pending += delta;
1326         if (cmd & OBD_BRW_WRITE)
1327                 cli->cl_pending_w_pages += delta;
1328         else
1329                 cli->cl_pending_r_pages += delta;
1330 }
1331
1332 /* this is called when a sync waiter receives an interruption.  Its job is to
1333  * get the caller woken as soon as possible.  If its page hasn't been put in an
1334  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1335  * desiring interruption which will forcefully complete the rpc once the rpc
1336  * has timed out */
1337 static void osc_occ_interrupted(struct oig_callback_context *occ)
1338 {
1339         struct osc_async_page *oap;
1340         struct loi_oap_pages *lop;
1341         struct lov_oinfo *loi;
1342         ENTRY;
1343
1344         /* XXX member_of() */
1345         oap = list_entry(occ, struct osc_async_page, oap_occ);
1346
1347         spin_lock(&oap->oap_cli->cl_loi_list_lock);
1348
1349         oap->oap_interrupted = 1;
1350
1351         /* ok, it's been put in an rpc. */
1352         if (oap->oap_request != NULL) {
1353                 ptlrpc_mark_interrupted(oap->oap_request);
1354                 ptlrpcd_wake(oap->oap_request);
1355                 GOTO(unlock, 0);
1356         }
1357
1358         /* we don't get interruption callbacks until osc_trigger_sync_io()
1359          * has been called and put the sync oaps in the pending/urgent lists.*/
1360         if (!list_empty(&oap->oap_pending_item)) {
1361                 list_del_init(&oap->oap_pending_item);
1362                 if (oap->oap_async_flags & ASYNC_URGENT)
1363                         list_del_init(&oap->oap_urgent_item);
1364
1365                 loi = oap->oap_loi;
1366                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1367                         &loi->loi_write_lop : &loi->loi_read_lop;
1368                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1369                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1370
1371                 oig_complete_one(oap->oap_oig, &oap->oap_occ, 0);
1372                 oap->oap_oig = NULL;
1373         }
1374
1375 unlock:
1376         spin_unlock(&oap->oap_cli->cl_loi_list_lock);
1377 }
1378
1379 /* this is trying to propogate async writeback errors back up to the
1380  * application.  As an async write fails we record the error code for later if
1381  * the app does an fsync.  As long as errors persist we force future rpcs to be
1382  * sync so that the app can get a sync error and break the cycle of queueing
1383  * pages for which writeback will fail. */
1384 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1385                            int rc)
1386 {
1387         if (rc) {
1388                 if (!ar->ar_rc)
1389                         ar->ar_rc = rc;
1390
1391                 ar->ar_force_sync = 1;
1392                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1393                 return;
1394
1395         }
1396
1397         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1398                 ar->ar_force_sync = 0;
1399 }
1400
1401 /* this must be called holding the loi list lock to give coverage to exit_cache,
1402  * async_flag maintenance, and oap_request */
1403 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1404                               struct osc_async_page *oap, int sent, int rc)
1405 {
1406         osc_exit_cache(cli, oap, sent);
1407         oap->oap_async_flags = 0;
1408         oap->oap_interrupted = 0;
1409
1410         if (oap->oap_cmd & OBD_BRW_WRITE) {
1411                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1412                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1413         }
1414
1415         if (oap->oap_request != NULL) {
1416                 ptlrpc_req_finished(oap->oap_request);
1417                 oap->oap_request = NULL;
1418         }
1419
1420         if (rc == 0 && oa != NULL) {
1421                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1422                         oap->oap_loi->loi_blocks = oa->o_blocks;
1423                 if (oa->o_valid & OBD_MD_FLMTIME)
1424                         oap->oap_loi->loi_mtime = oa->o_mtime;
1425         }
1426
1427         if (oap->oap_oig) {
1428                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1429                 oap->oap_oig = NULL;
1430                 EXIT;
1431                 return;
1432         }
1433
1434         oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
1435                                            oa, rc);
1436 }
1437
1438 static int brw_interpret_oap(struct ptlrpc_request *request,
1439                              struct osc_brw_async_args *aa, int rc)
1440 {
1441         struct osc_async_page *oap;
1442         struct client_obd *cli;
1443         struct list_head *pos, *n;
1444         ENTRY;
1445
1446         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
1447                                   aa->aa_nio_count, aa->aa_page_count,
1448                                   aa->aa_pga, rc);
1449
1450         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1451
1452         cli = aa->aa_cli;
1453
1454         spin_lock(&cli->cl_loi_list_lock);
1455
1456         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1457          * is called so we know whether to go to sync BRWs or wait for more
1458          * RPCs to complete */
1459         if (request->rq_reqmsg->opc == OST_WRITE)
1460                 cli->cl_w_in_flight--;
1461         else
1462                 cli->cl_r_in_flight--;
1463
1464         /* the caller may re-use the oap after the completion call so
1465          * we need to clean it up a little */
1466         list_for_each_safe(pos, n, &aa->aa_oaps) {
1467                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1468
1469                 //CDEBUG(D_INODE, "page %p index %lu oap %p\n",
1470                        //oap->oap_page, oap->oap_page->index, oap);
1471
1472                 list_del_init(&oap->oap_rpc_item);
1473                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1474         }
1475
1476         osc_wake_cache_waiters(cli);
1477         osc_check_rpcs(cli);
1478
1479         spin_unlock(&cli->cl_loi_list_lock);
1480
1481         obdo_free(aa->aa_oa);
1482         OBD_FREE(aa->aa_pga, aa->aa_page_count * sizeof(struct brw_page));
1483
1484         RETURN(0);
1485 }
1486
1487 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1488                                             struct list_head *rpc_list,
1489                                             int page_count, int cmd)
1490 {
1491         struct ptlrpc_request *req;
1492         struct brw_page *pga = NULL;
1493         int requested_nob, nio_count;
1494         struct osc_brw_async_args *aa;
1495         struct obdo *oa = NULL;
1496         struct obd_async_page_ops *ops = NULL;
1497         void *caller_data = NULL;
1498         struct list_head *pos;
1499         int i, rc;
1500
1501         LASSERT(!list_empty(rpc_list));
1502
1503         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1504         if (pga == NULL)
1505                 RETURN(ERR_PTR(-ENOMEM));
1506
1507         oa = obdo_alloc();
1508         if (oa == NULL)
1509                 GOTO(out, req = ERR_PTR(-ENOMEM));
1510
1511         i = 0;
1512         list_for_each(pos, rpc_list) {
1513                 struct osc_async_page *oap;
1514
1515                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1516                 if (ops == NULL) {
1517                         ops = oap->oap_caller_ops;
1518                         caller_data = oap->oap_caller_data;
1519                 }
1520                 pga[i].off = oap->oap_obj_off + oap->oap_page_off;
1521                 pga[i].pg = oap->oap_page;
1522                 pga[i].count = oap->oap_count;
1523                 pga[i].flag = oap->oap_brw_flags;
1524                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1525                        pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
1526                 i++;
1527         }
1528
1529         /* always get the data for the obdo for the rpc */
1530         LASSERT(ops != NULL);
1531         ops->ap_fill_obdo(caller_data, cmd, oa);
1532
1533         sort_brw_pages(pga, page_count);
1534         rc = osc_brw_prep_request(cmd, cli->cl_import, oa, NULL, page_count,
1535                                   pga, &requested_nob, &nio_count, &req);
1536         if (rc != 0) {
1537                 CERROR("prep_req failed: %d\n", rc);
1538                 GOTO(out, req = ERR_PTR(rc));
1539         }
1540
1541         LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1542         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1543         aa->aa_oa = oa;
1544         aa->aa_requested_nob = requested_nob;
1545         aa->aa_nio_count = nio_count;
1546         aa->aa_page_count = page_count;
1547         aa->aa_pga = pga;
1548         aa->aa_cli = cli;
1549
1550 out:
1551         if (IS_ERR(req)) {
1552                 if (oa)
1553                         obdo_free(oa);
1554                 if (pga)
1555                         OBD_FREE(pga, sizeof(*pga) * page_count);
1556         }
1557         RETURN(req);
1558 }
1559
1560 /* the loi lock is held across this function but it's allowed to release
1561  * and reacquire it during its work */
1562 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1563                             int cmd, struct loi_oap_pages *lop)
1564 {
1565         struct ptlrpc_request *request;
1566         obd_count page_count = 0;
1567         struct list_head *tmp, *pos;
1568         struct osc_async_page *oap = NULL;
1569         struct osc_brw_async_args *aa;
1570         struct obd_async_page_ops *ops;
1571         LIST_HEAD(rpc_list);
1572         unsigned int ending_offset;
1573         unsigned  starting_offset = 0;
1574         ENTRY;
1575
1576         /* first we find the pages we're allowed to work with */
1577         list_for_each_safe(pos, tmp, &lop->lop_pending) {
1578                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
1579                 ops = oap->oap_caller_ops;
1580
1581                 LASSERT(oap->oap_magic == OAP_MAGIC);
1582
1583                 /* in llite being 'ready' equates to the page being locked
1584                  * until completion unlocks it.  commit_write submits a page
1585                  * as not ready because its unlock will happen unconditionally
1586                  * as the call returns.  if we race with commit_write giving
1587                  * us that page we dont' want to create a hole in the page
1588                  * stream, so we stop and leave the rpc to be fired by
1589                  * another dirtier or kupdated interval (the not ready page
1590                  * will still be on the dirty list).  we could call in
1591                  * at the end of ll_file_write to process the queue again. */
1592                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1593                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1594                         if (rc < 0)
1595                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1596                                                 "instead of ready\n", oap,
1597                                                 oap->oap_page, rc);
1598                         switch (rc) {
1599                         case -EAGAIN:
1600                                 /* llite is telling us that the page is still
1601                                  * in commit_write and that we should try
1602                                  * and put it in an rpc again later.  we
1603                                  * break out of the loop so we don't create
1604                                  * a hole in the sequence of pages in the rpc
1605                                  * stream.*/
1606                                 pos = NULL;
1607                                 break;
1608                         case -EINTR:
1609                                 /* the io isn't needed.. tell the checks
1610                                  * below to complete the rpc with EINTR */
1611                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1612                                 oap->oap_count = -EINTR;
1613                                 break;
1614                         case 0:
1615                                 oap->oap_async_flags |= ASYNC_READY;
1616                                 break;
1617                         default:
1618                                 LASSERTF(0, "oap %p page %p returned %d "
1619                                             "from make_ready\n", oap,
1620                                             oap->oap_page, rc);
1621                                 break;
1622                         }
1623                 }
1624                 if (pos == NULL)
1625                         break;
1626                 /*
1627                  * Page submitted for IO has to be locked. Either by
1628                  * ->ap_make_ready() or by higher layers.
1629                  *
1630                  * XXX nikita: this assertion should be adjusted when lustre
1631                  * starts using PG_writeback for pages being written out.
1632                  */
1633 #if defined(__KERNEL__)
1634                 LASSERT(PageLocked(oap->oap_page));
1635 #endif
1636                 /* If there is a gap at the start of this page, it can't merge
1637                  * with any previous page, so we'll hand the network a
1638                  * "fragmented" page array that it can't transfer in 1 RDMA */
1639                 if (page_count != 0 && oap->oap_page_off != 0)
1640                         break;
1641
1642                 /* take the page out of our book-keeping */
1643                 list_del_init(&oap->oap_pending_item);
1644                 lop_update_pending(cli, lop, cmd, -1);
1645                 list_del_init(&oap->oap_urgent_item);
1646
1647                 if (page_count == 0)
1648                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1649                                           (PTLRPC_MAX_BRW_SIZE - 1);
1650
1651                 /* ask the caller for the size of the io as the rpc leaves. */
1652                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1653                         oap->oap_count =
1654                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1655                 if (oap->oap_count <= 0) {
1656                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1657                                oap->oap_count);
1658                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1659                         continue;
1660                 }
1661
1662                 /* now put the page back in our accounting */
1663                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1664                 if (++page_count >= cli->cl_max_pages_per_rpc)
1665                         break;
1666
1667                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
1668                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
1669                  * have the same alignment as the initial writes that allocated
1670                  * extents on the server. */
1671                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
1672                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
1673                 if (ending_offset == 0)
1674                         break;
1675
1676                 /* If there is a gap at the end of this page, it can't merge
1677                  * with any subsequent pages, so we'll hand the network a
1678                  * "fragmented" page array that it can't transfer in 1 RDMA */
1679                 if (oap->oap_page_off + oap->oap_count < PAGE_SIZE)
1680                         break;
1681         }
1682
1683         osc_wake_cache_waiters(cli);
1684
1685         if (page_count == 0)
1686                 RETURN(0);
1687
1688         loi_list_maint(cli, loi);
1689
1690         spin_unlock(&cli->cl_loi_list_lock);
1691
1692         request = osc_build_req(cli, &rpc_list, page_count, cmd);
1693         if (IS_ERR(request)) {
1694                 /* this should happen rarely and is pretty bad, it makes the
1695                  * pending list not follow the dirty order */
1696                 spin_lock(&cli->cl_loi_list_lock);
1697                 list_for_each_safe(pos, tmp, &rpc_list) {
1698                         oap = list_entry(pos, struct osc_async_page,
1699                                          oap_rpc_item);
1700                         list_del_init(&oap->oap_rpc_item);
1701
1702                         /* queued sync pages can be torn down while the pages
1703                          * were between the pending list and the rpc */
1704                         if (oap->oap_interrupted) {
1705                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
1706                                 osc_ap_completion(cli, NULL, oap, 0,
1707                                                   oap->oap_count);
1708                                 continue;
1709                         }
1710                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(request));
1711
1712                         /* put the page back in the loi/lop lists */
1713                         list_add_tail(&oap->oap_pending_item,
1714                                       &lop->lop_pending);
1715                         lop_update_pending(cli, lop, cmd, 1);
1716                         if (oap->oap_async_flags & ASYNC_URGENT)
1717                                 list_add(&oap->oap_urgent_item,
1718                                          &lop->lop_urgent);
1719                 }
1720                 loi_list_maint(cli, loi);
1721                 RETURN(PTR_ERR(request));
1722         }
1723
1724         LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1725         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1726         INIT_LIST_HEAD(&aa->aa_oaps);
1727         list_splice(&rpc_list, &aa->aa_oaps);
1728         INIT_LIST_HEAD(&rpc_list);
1729
1730         if (cmd == OBD_BRW_READ) {
1731                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1732                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1733                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1734                                       starting_offset/PAGE_SIZE + 1);
1735         } else {
1736                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1737                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1738                                  cli->cl_w_in_flight);
1739                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1740                                       starting_offset/PAGE_SIZE + 1);
1741         }
1742
1743         spin_lock(&cli->cl_loi_list_lock);
1744
1745         if (cmd == OBD_BRW_READ)
1746                 cli->cl_r_in_flight++;
1747         else
1748                 cli->cl_w_in_flight++;
1749
1750         /* queued sync pages can be torn down while the pages
1751          * were between the pending list and the rpc */
1752         list_for_each(pos, &aa->aa_oaps) {
1753                 oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
1754                 if (oap->oap_interrupted) {
1755                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1756                                oap, request);
1757                         ptlrpc_mark_interrupted(request);
1758                         break;
1759                 }
1760         }
1761
1762         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
1763                         request, page_count, aa, cli->cl_r_in_flight,
1764                         cli->cl_w_in_flight);
1765
1766         oap->oap_request = ptlrpc_request_addref(request);
1767         request->rq_interpret_reply = brw_interpret_oap;
1768         ptlrpcd_add_req(request);
1769         RETURN(1);
1770 }
1771
1772 #define LOI_DEBUG(LOI, STR, args...)                                     \
1773         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
1774                !list_empty(&(LOI)->loi_cli_item),                        \
1775                (LOI)->loi_write_lop.lop_num_pending,                     \
1776                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
1777                (LOI)->loi_read_lop.lop_num_pending,                      \
1778                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
1779                args)                                                     \
1780
1781 /* This is called by osc_check_rpcs() to find which objects have pages that
1782  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
1783 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
1784 {
1785         ENTRY;
1786         /* first return all objects which we already know to have
1787          * pages ready to be stuffed into rpcs */
1788         if (!list_empty(&cli->cl_loi_ready_list))
1789                 RETURN(list_entry(cli->cl_loi_ready_list.next,
1790                                   struct lov_oinfo, loi_cli_item));
1791
1792         /* then if we have cache waiters, return all objects with queued
1793          * writes.  This is especially important when many small files
1794          * have filled up the cache and not been fired into rpcs because
1795          * they don't pass the nr_pending/object threshhold */
1796         if (!list_empty(&cli->cl_cache_waiters) &&
1797             !list_empty(&cli->cl_loi_write_list))
1798                 RETURN(list_entry(cli->cl_loi_write_list.next,
1799                                   struct lov_oinfo, loi_write_item));
1800
1801         /* then return all queued objects when we have an invalid import
1802          * so that they get flushed */
1803         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
1804                 if (!list_empty(&cli->cl_loi_write_list))
1805                         RETURN(list_entry(cli->cl_loi_write_list.next,
1806                                           struct lov_oinfo, loi_write_item));
1807                 if (!list_empty(&cli->cl_loi_read_list))
1808                         RETURN(list_entry(cli->cl_loi_read_list.next,
1809                                           struct lov_oinfo, loi_read_item));
1810         }
1811         RETURN(NULL);
1812 }
1813
1814 /* called with the loi list lock held */
1815 static void osc_check_rpcs(struct client_obd *cli)
1816 {
1817         struct lov_oinfo *loi;
1818         int rc = 0, race_counter = 0;
1819         ENTRY;
1820
1821         while ((loi = osc_next_loi(cli)) != NULL) {
1822                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
1823
1824                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
1825                         break;
1826
1827                 /* attempt some read/write balancing by alternating between
1828                  * reads and writes in an object.  The makes_rpc checks here
1829                  * would be redundant if we were getting read/write work items
1830                  * instead of objects.  we don't want send_oap_rpc to drain a
1831                  * partial read pending queue when we're given this object to
1832                  * do io on writes while there are cache waiters */
1833                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
1834                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
1835                                               &loi->loi_write_lop);
1836                         if (rc < 0)
1837                                 break;
1838                         if (rc > 0)
1839                                 race_counter = 0;
1840                         else
1841                                 race_counter++;
1842                 }
1843                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
1844                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
1845                                               &loi->loi_read_lop);
1846                         if (rc < 0)
1847                                 break;
1848                         if (rc > 0)
1849                                 race_counter = 0;
1850                         else
1851                                 race_counter++;
1852                 }
1853
1854                 /* attempt some inter-object balancing by issueing rpcs
1855                  * for each object in turn */
1856                 if (!list_empty(&loi->loi_cli_item))
1857                         list_del_init(&loi->loi_cli_item);
1858                 if (!list_empty(&loi->loi_write_item))
1859                         list_del_init(&loi->loi_write_item);
1860                 if (!list_empty(&loi->loi_read_item))
1861                         list_del_init(&loi->loi_read_item);
1862
1863                 loi_list_maint(cli, loi);
1864
1865                 /* send_oap_rpc fails with 0 when make_ready tells it to
1866                  * back off.  llite's make_ready does this when it tries
1867                  * to lock a page queued for write that is already locked.
1868                  * we want to try sending rpcs from many objects, but we
1869                  * don't want to spin failing with 0.  */
1870                 if (race_counter == 10)
1871                         break;
1872         }
1873         EXIT;
1874 }
1875
1876 /* we're trying to queue a page in the osc so we're subject to the
1877  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
1878  * If the osc's queued pages are already at that limit, then we want to sleep
1879  * until there is space in the osc's queue for us.  We also may be waiting for
1880  * write credits from the OST if there are RPCs in flight that may return some
1881  * before we fall back to sync writes.
1882  *
1883  * We need this know our allocation was granted in the presence of signals */
1884 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1885 {
1886         int rc;
1887         ENTRY;
1888         spin_lock(&cli->cl_loi_list_lock);
1889         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
1890         spin_unlock(&cli->cl_loi_list_lock);
1891         RETURN(rc);
1892 };
1893
1894 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
1895  * grant or cache space. */
1896 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
1897                            struct osc_async_page *oap)
1898 {
1899         struct osc_cache_waiter ocw;
1900         struct l_wait_info lwi = { 0 };
1901
1902         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
1903                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
1904                cli->cl_avail_grant);
1905
1906         /* force the caller to try sync io.  this can jump the list
1907          * of queued writes and create a discontiguous rpc stream */
1908         if (cli->cl_dirty_max < PAGE_SIZE || cli->cl_ar.ar_force_sync ||
1909             loi->loi_ar.ar_force_sync)
1910                 return(-EDQUOT);
1911
1912         /* Hopefully normal case - cache space and write credits available */
1913         if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1914             cli->cl_avail_grant >= PAGE_SIZE) {
1915                 /* account for ourselves */
1916                 osc_consume_write_grant(cli, oap);
1917                 return(0);
1918         }
1919
1920         /* Make sure that there are write rpcs in flight to wait for.  This
1921          * is a little silly as this object may not have any pending but
1922          * other objects sure might. */
1923         if (cli->cl_w_in_flight) {
1924                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1925                 init_waitqueue_head(&ocw.ocw_waitq);
1926                 ocw.ocw_oap = oap;
1927                 ocw.ocw_rc = 0;
1928
1929                 loi_list_maint(cli, loi);
1930                 osc_check_rpcs(cli);
1931                 spin_unlock(&cli->cl_loi_list_lock);
1932
1933                 CDEBUG(D_CACHE, "sleeping for cache space\n");
1934                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1935
1936                 spin_lock(&cli->cl_loi_list_lock);
1937                 if (!list_empty(&ocw.ocw_entry)) {
1938                         list_del(&ocw.ocw_entry);
1939                         RETURN(-EINTR);
1940                 }
1941                 RETURN(ocw.ocw_rc);
1942         }
1943
1944         RETURN(-EDQUOT);
1945 }
1946
1947 /* the companion to enter_cache, called when an oap is no longer part of the
1948  * dirty accounting.. so writeback completes or truncate happens before writing
1949  * starts.  must be called with the loi lock held. */
1950 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1951                            int sent)
1952 {
1953         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
1954         ENTRY;
1955
1956         if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
1957                 EXIT;
1958                 return;
1959         }
1960
1961         oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
1962         cli->cl_dirty -= PAGE_SIZE;
1963         if (!sent) {
1964                 cli->cl_lost_grant += PAGE_SIZE;
1965                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
1966                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
1967         } else if (PAGE_SIZE != blocksize && oap->oap_count != PAGE_SIZE) {
1968                 /* For short writes we shouldn't count parts of pages that
1969                  * span a whole block on the OST side, or our accounting goes
1970                  * wrong.  Should match the code in filter_grant_check. */
1971                 int offset = (oap->oap_obj_off +oap->oap_page_off) & ~PAGE_MASK;
1972                 int count = oap->oap_count + (offset & (blocksize - 1));
1973                 int end = (offset + oap->oap_count) & (blocksize - 1);
1974                 if (end)
1975                         count += blocksize - end;
1976
1977                 cli->cl_lost_grant += PAGE_SIZE - count;
1978                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
1979                        PAGE_SIZE - count, cli->cl_lost_grant,
1980                        cli->cl_avail_grant, cli->cl_dirty);
1981         }
1982
1983         EXIT;
1984 }
1985
1986 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
1987                         struct lov_oinfo *loi, struct page *page,
1988                         obd_off offset, struct obd_async_page_ops *ops,
1989                         void *data, void **res)
1990 {
1991         struct osc_async_page *oap;
1992         ENTRY;
1993
1994         if (!page)
1995                 return size_round(sizeof(*oap));
1996
1997         oap = *res;
1998         oap->oap_magic = OAP_MAGIC;
1999         oap->oap_cli = &exp->exp_obd->u.cli;
2000         oap->oap_loi = loi;
2001
2002         oap->oap_caller_ops = ops;
2003         oap->oap_caller_data = data;
2004
2005         oap->oap_page = page;
2006         oap->oap_obj_off = offset;
2007
2008         INIT_LIST_HEAD(&oap->oap_pending_item);
2009         INIT_LIST_HEAD(&oap->oap_urgent_item);
2010         INIT_LIST_HEAD(&oap->oap_rpc_item);
2011
2012         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2013
2014         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2015         RETURN(0);
2016 }
2017
2018 struct osc_async_page *oap_from_cookie(void *cookie)
2019 {
2020         struct osc_async_page *oap = cookie;
2021         if (oap->oap_magic != OAP_MAGIC)
2022                 return ERR_PTR(-EINVAL);
2023         return oap;
2024 };
2025
2026 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2027                               struct lov_oinfo *loi, void *cookie,
2028                               int cmd, obd_off off, int count,
2029                               obd_flag brw_flags, enum async_flags async_flags)
2030 {
2031         struct client_obd *cli = &exp->exp_obd->u.cli;
2032         struct osc_async_page *oap;
2033         struct loi_oap_pages *lop;
2034         int rc = 0;
2035         ENTRY;
2036
2037         oap = oap_from_cookie(cookie);
2038         if (IS_ERR(oap))
2039                 RETURN(PTR_ERR(oap));
2040
2041         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2042                 RETURN(-EIO);
2043
2044         if (!list_empty(&oap->oap_pending_item) ||
2045             !list_empty(&oap->oap_urgent_item) ||
2046             !list_empty(&oap->oap_rpc_item))
2047                 RETURN(-EBUSY);
2048
2049         /* check if the file's owner/group is over quota */
2050 #ifdef HAVE_QUOTA_SUPPORT
2051         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2052                 struct obd_async_page_ops *ops;
2053                 struct obdo *oa;
2054
2055                 oa = obdo_alloc();
2056                 if (oa == NULL)
2057                         RETURN(-ENOMEM);
2058
2059                 ops = oap->oap_caller_ops;
2060                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2061                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2062                     NO_QUOTA)
2063                         rc = -EDQUOT;
2064
2065                 obdo_free(oa);
2066                 if (rc)
2067                         RETURN(rc);
2068         }
2069 #endif
2070
2071         if (loi == NULL)
2072                 loi = &lsm->lsm_oinfo[0];
2073
2074         spin_lock(&cli->cl_loi_list_lock);
2075
2076         oap->oap_cmd = cmd;
2077         oap->oap_page_off = off;
2078         oap->oap_count = count;
2079         oap->oap_brw_flags = brw_flags;
2080         oap->oap_async_flags = async_flags;
2081
2082         if (cmd & OBD_BRW_WRITE) {
2083                 rc = osc_enter_cache(cli, loi, oap);
2084                 if (rc) {
2085                         spin_unlock(&cli->cl_loi_list_lock);
2086                         RETURN(rc);
2087                 }
2088                 lop = &loi->loi_write_lop;
2089         } else {
2090                 lop = &loi->loi_read_lop;
2091         }
2092
2093         if (oap->oap_async_flags & ASYNC_URGENT)
2094                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2095         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2096         lop_update_pending(cli, lop, cmd, 1);
2097
2098         loi_list_maint(cli, loi);
2099
2100         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2101                   cmd);
2102
2103         osc_check_rpcs(cli);
2104         spin_unlock(&cli->cl_loi_list_lock);
2105
2106         RETURN(0);
2107 }
2108
2109 /* aka (~was & now & flag), but this is more clear :) */
2110 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2111
2112 static int osc_set_async_flags(struct obd_export *exp,
2113                                struct lov_stripe_md *lsm,
2114                                struct lov_oinfo *loi, void *cookie,
2115                                obd_flag async_flags)
2116 {
2117         struct client_obd *cli = &exp->exp_obd->u.cli;
2118         struct loi_oap_pages *lop;
2119         struct osc_async_page *oap;
2120         int rc = 0;
2121         ENTRY;
2122
2123         oap = oap_from_cookie(cookie);
2124         if (IS_ERR(oap))
2125                 RETURN(PTR_ERR(oap));
2126
2127         /*
2128          * bug 7311: OST-side locking is only supported for liblustre for now
2129          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2130          * implementation has to handle case where OST-locked page was picked
2131          * up by, e.g., ->writepage().
2132          */
2133         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2134         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2135                                      * tread here. */
2136
2137         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2138                 RETURN(-EIO);
2139
2140         if (loi == NULL)
2141                 loi = &lsm->lsm_oinfo[0];
2142
2143         if (oap->oap_cmd & OBD_BRW_WRITE) {
2144                 lop = &loi->loi_write_lop;
2145         } else {
2146                 lop = &loi->loi_read_lop;
2147         }
2148
2149         spin_lock(&cli->cl_loi_list_lock);
2150
2151         if (list_empty(&oap->oap_pending_item))
2152                 GOTO(out, rc = -EINVAL);
2153
2154         if ((oap->oap_async_flags & async_flags) == async_flags)
2155                 GOTO(out, rc = 0);
2156
2157         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2158                 oap->oap_async_flags |= ASYNC_READY;
2159
2160         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2161                 if (list_empty(&oap->oap_rpc_item)) {
2162                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2163                         loi_list_maint(cli, loi);
2164                 }
2165         }
2166
2167         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2168                         oap->oap_async_flags);
2169 out:
2170         osc_check_rpcs(cli);
2171         spin_unlock(&cli->cl_loi_list_lock);
2172         RETURN(rc);
2173 }
2174
2175 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2176                              struct lov_oinfo *loi,
2177                              struct obd_io_group *oig, void *cookie,
2178                              int cmd, obd_off off, int count,
2179                              obd_flag brw_flags,
2180                              obd_flag async_flags)
2181 {
2182         struct client_obd *cli = &exp->exp_obd->u.cli;
2183         struct osc_async_page *oap;
2184         struct loi_oap_pages *lop;
2185         ENTRY;
2186
2187         oap = oap_from_cookie(cookie);
2188         if (IS_ERR(oap))
2189                 RETURN(PTR_ERR(oap));
2190
2191         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2192                 RETURN(-EIO);
2193
2194         if (!list_empty(&oap->oap_pending_item) ||
2195             !list_empty(&oap->oap_urgent_item) ||
2196             !list_empty(&oap->oap_rpc_item))
2197                 RETURN(-EBUSY);
2198
2199         if (loi == NULL)
2200                 loi = &lsm->lsm_oinfo[0];
2201
2202         spin_lock(&cli->cl_loi_list_lock);
2203
2204         oap->oap_cmd = cmd;
2205         oap->oap_page_off = off;
2206         oap->oap_count = count;
2207         oap->oap_brw_flags = brw_flags;
2208         oap->oap_async_flags = async_flags;
2209
2210         if (cmd & OBD_BRW_WRITE)
2211                 lop = &loi->loi_write_lop;
2212         else
2213                 lop = &loi->loi_read_lop;
2214
2215         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2216         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2217                 oap->oap_oig = oig;
2218                 oig_add_one(oig, &oap->oap_occ);
2219         }
2220
2221         LOI_DEBUG(loi, "oap %p page %p on group pending\n", oap, oap->oap_page);
2222
2223         spin_unlock(&cli->cl_loi_list_lock);
2224
2225         RETURN(0);
2226 }
2227
2228 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2229                                  struct loi_oap_pages *lop, int cmd)
2230 {
2231         struct list_head *pos, *tmp;
2232         struct osc_async_page *oap;
2233
2234         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2235                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2236                 list_del(&oap->oap_pending_item);
2237                 list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2238                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2239                 lop_update_pending(cli, lop, cmd, 1);
2240         }
2241         loi_list_maint(cli, loi);
2242 }
2243
2244 static int osc_trigger_group_io(struct obd_export *exp,
2245                                 struct lov_stripe_md *lsm,
2246                                 struct lov_oinfo *loi,
2247                                 struct obd_io_group *oig)
2248 {
2249         struct client_obd *cli = &exp->exp_obd->u.cli;
2250         ENTRY;
2251
2252         if (loi == NULL)
2253                 loi = &lsm->lsm_oinfo[0];
2254
2255         spin_lock(&cli->cl_loi_list_lock);
2256
2257         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2258         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2259
2260         osc_check_rpcs(cli);
2261         spin_unlock(&cli->cl_loi_list_lock);
2262
2263         RETURN(0);
2264 }
2265
2266 static int osc_teardown_async_page(struct obd_export *exp,
2267                                    struct lov_stripe_md *lsm,
2268                                    struct lov_oinfo *loi, void *cookie)
2269 {
2270         struct client_obd *cli = &exp->exp_obd->u.cli;
2271         struct loi_oap_pages *lop;
2272         struct osc_async_page *oap;
2273         int rc = 0;
2274         ENTRY;
2275
2276         oap = oap_from_cookie(cookie);
2277         if (IS_ERR(oap))
2278                 RETURN(PTR_ERR(oap));
2279
2280         if (loi == NULL)
2281                 loi = &lsm->lsm_oinfo[0];
2282
2283         if (oap->oap_cmd & OBD_BRW_WRITE) {
2284                 lop = &loi->loi_write_lop;
2285         } else {
2286                 lop = &loi->loi_read_lop;
2287         }
2288
2289         spin_lock(&cli->cl_loi_list_lock);
2290
2291         if (!list_empty(&oap->oap_rpc_item))
2292                 GOTO(out, rc = -EBUSY);
2293
2294         osc_exit_cache(cli, oap, 0);
2295         osc_wake_cache_waiters(cli);
2296
2297         if (!list_empty(&oap->oap_urgent_item)) {
2298                 list_del_init(&oap->oap_urgent_item);
2299                 oap->oap_async_flags &= ~ASYNC_URGENT;
2300         }
2301         if (!list_empty(&oap->oap_pending_item)) {
2302                 list_del_init(&oap->oap_pending_item);
2303                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2304         }
2305         loi_list_maint(cli, loi);
2306
2307         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2308 out:
2309         spin_unlock(&cli->cl_loi_list_lock);
2310         RETURN(rc);
2311 }
2312
2313 /* Note: caller will lock/unlock, and set uptodate on the pages */
2314 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2315 static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
2316                            struct lov_stripe_md *lsm, obd_count page_count,
2317                            struct brw_page *pga)
2318 {
2319         struct ptlrpc_request *request = NULL;
2320         struct ost_body *body;
2321         struct niobuf_remote *nioptr;
2322         struct obd_ioobj *iooptr;
2323         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2324         struct obd_import *imp = class_exp2cliimp(exp);
2325         int swab;
2326         ENTRY;
2327
2328         /* XXX does not handle 'new' brw protocol */
2329
2330         size[1] = sizeof(struct obd_ioobj);
2331         size[2] = page_count * sizeof(*nioptr);
2332
2333         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
2334                                   OST_SAN_READ, 3, size, NULL);
2335         if (!request)
2336                 RETURN(-ENOMEM);
2337
2338         /* FIXME bug 249 */
2339         /* See bug 7198 */
2340         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2341                 request->rq_request_portal = OST_IO_PORTAL;
2342
2343         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
2344         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof(*iooptr));
2345         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2346                                 sizeof(*nioptr) * page_count);
2347
2348         memcpy(&body->oa, oa, sizeof(body->oa));
2349
2350         obdo_to_ioobj(oa, iooptr);
2351         iooptr->ioo_bufcnt = page_count;
2352
2353         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2354                 LASSERT(PageLocked(pga[mapped].pg));
2355                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2356
2357                 nioptr->offset = pga[mapped].off;
2358                 nioptr->len    = pga[mapped].count;
2359                 nioptr->flags  = pga[mapped].flag;
2360         }
2361
2362         size[1] = page_count * sizeof(*nioptr);
2363         request->rq_replen = lustre_msg_size(2, size);
2364
2365         rc = ptlrpc_queue_wait(request);
2366         if (rc)
2367                 GOTO(out_req, rc);
2368
2369         body = lustre_swab_repbuf(request, 0, sizeof(*body),
2370                                   lustre_swab_ost_body);
2371         if (body == NULL) {
2372                 CERROR("Can't unpack body\n");
2373                 GOTO(out_req, rc = -EPROTO);
2374         }
2375
2376         memcpy(oa, &body->oa, sizeof(*oa));
2377
2378         swab = lustre_msg_swabbed(request->rq_repmsg);
2379         LASSERT_REPSWAB(request, 1);
2380         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2381         if (!nioptr) {
2382                 /* nioptr missing or short */
2383                 GOTO(out_req, rc = -EPROTO);
2384         }
2385
2386         /* actual read */
2387         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2388                 struct page *page = pga[mapped].pg;
2389                 struct buffer_head *bh;
2390                 kdev_t dev;
2391
2392                 if (swab)
2393                         lustre_swab_niobuf_remote (nioptr);
2394
2395                 /* got san device associated */
2396                 LASSERT(exp->exp_obd != NULL);
2397                 dev = exp->exp_obd->u.cli.cl_sandev;
2398
2399                 /* hole */
2400                 if (!nioptr->offset) {
2401                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
2402                                         page->mapping->host->i_ino,
2403                                         page->index);
2404                         memset(page_address(page), 0, PAGE_SIZE);
2405                         continue;
2406                 }
2407
2408                 if (!page->buffers) {
2409                         create_empty_buffers(page, dev, PAGE_SIZE);
2410                         bh = page->buffers;
2411
2412                         clear_bit(BH_New, &bh->b_state);
2413                         set_bit(BH_Mapped, &bh->b_state);
2414                         bh->b_blocknr = (unsigned long)nioptr->offset;
2415
2416                         clear_bit(BH_Uptodate, &bh->b_state);
2417
2418                         ll_rw_block(READ, 1, &bh);
2419                 } else {
2420                         bh = page->buffers;
2421
2422                         /* if buffer already existed, it must be the
2423                          * one we mapped before, check it */
2424                         LASSERT(!test_bit(BH_New, &bh->b_state));
2425                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
2426                         LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
2427
2428                         /* wait it's io completion */
2429                         if (test_bit(BH_Lock, &bh->b_state))
2430                                 wait_on_buffer(bh);
2431
2432                         if (!test_bit(BH_Uptodate, &bh->b_state))
2433                                 ll_rw_block(READ, 1, &bh);
2434                 }
2435
2436
2437                 /* must do syncronous write here */
2438                 wait_on_buffer(bh);
2439                 if (!buffer_uptodate(bh)) {
2440                         /* I/O error */
2441                         rc = -EIO;
2442                         goto out_req;
2443                 }
2444         }
2445
2446 out_req:
2447         ptlrpc_req_finished(request);
2448         RETURN(rc);
2449 }
2450
2451 static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
2452                             struct lov_stripe_md *lsm, obd_count page_count,
2453                             struct brw_page *pga)
2454 {
2455         struct client_obd *cli = &exp->exp_obd->u.cli;
2456         struct ptlrpc_request *request = NULL;
2457         struct ost_body *body;
2458         struct niobuf_remote *nioptr;
2459         struct obd_ioobj *iooptr;
2460         struct obd_import *imp = class_exp2cliimp(exp);
2461         int rc, size[3] = {sizeof(*body)}, mapped = 0;
2462         int swab;
2463         ENTRY;
2464
2465         size[1] = sizeof(struct obd_ioobj);
2466         size[2] = page_count * sizeof(*nioptr);
2467
2468         request = ptlrpc_prep_req_pool(class_exp2cliimp(exp),
2469                                        LUSTRE_OST_VERSION, OST_SAN_WRITE,
2470                                        3, size, NULL, cli->cl_rq_pool);
2471         if (!request)
2472                 RETURN(-ENOMEM);
2473
2474         /* FIXME bug 249 */
2475         /* See bug 7198 */
2476         if (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2477                 request->rq_request_portal = OST_IO_PORTAL;
2478
2479         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
2480         iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
2481         nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
2482                                 sizeof (*nioptr) * page_count);
2483
2484         memcpy(&body->oa, oa, sizeof(body->oa));
2485
2486         obdo_to_ioobj(oa, iooptr);
2487         iooptr->ioo_bufcnt = page_count;
2488
2489         /* pack request */
2490         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2491                 LASSERT(PageLocked(pga[mapped].pg));
2492                 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
2493
2494                 nioptr->offset = pga[mapped].off;
2495                 nioptr->len    = pga[mapped].count;
2496                 nioptr->flags  = pga[mapped].flag;
2497         }
2498
2499         size[1] = page_count * sizeof(*nioptr);
2500         request->rq_replen = lustre_msg_size(2, size);
2501
2502         rc = ptlrpc_queue_wait(request);
2503         if (rc)
2504                 GOTO(out_req, rc);
2505
2506         swab = lustre_msg_swabbed (request->rq_repmsg);
2507         LASSERT_REPSWAB (request, 1);
2508         nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
2509         if (!nioptr) {
2510                 CERROR("absent/short niobuf array\n");
2511                 GOTO(out_req, rc = -EPROTO);
2512         }
2513
2514         /* actual write */
2515         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
2516                 struct page *page = pga[mapped].pg;
2517                 struct buffer_head *bh;
2518                 kdev_t dev;
2519
2520                 if (swab)
2521                         lustre_swab_niobuf_remote (nioptr);
2522
2523                 /* got san device associated */
2524                 LASSERT(exp->exp_obd != NULL);
2525                 dev = exp->exp_obd->u.cli.cl_sandev;
2526
2527                 if (!page->buffers) {
2528                         create_empty_buffers(page, dev, PAGE_SIZE);
2529                 } else {
2530                         /* checking */
2531                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
2532                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
2533                         LASSERT(page->buffers->b_blocknr ==
2534                                 (unsigned long)nioptr->offset);
2535                 }
2536                 bh = page->buffers;
2537
2538                 LASSERT(bh);
2539
2540                 /* if buffer locked, wait it's io completion */
2541                 if (test_bit(BH_Lock, &bh->b_state))
2542                         wait_on_buffer(bh);
2543
2544                 clear_bit(BH_New, &bh->b_state);
2545                 set_bit(BH_Mapped, &bh->b_state);
2546
2547                 /* override the block nr */
2548                 bh->b_blocknr = (unsigned long)nioptr->offset;
2549
2550                 /* we are about to write it, so set it
2551                  * uptodate/dirty
2552                  * page lock should garentee no race condition here */
2553                 set_bit(BH_Uptodate, &bh->b_state);
2554                 set_bit(BH_Dirty, &bh->b_state);
2555
2556                 ll_rw_block(WRITE, 1, &bh);
2557
2558                 /* must do syncronous write here */
2559                 wait_on_buffer(bh);
2560                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
2561                         /* I/O error */
2562                         rc = -EIO;
2563                         goto out_req;
2564                 }
2565         }
2566
2567 out_req:
2568         ptlrpc_req_finished(request);
2569         RETURN(rc);
2570 }
2571
2572 static int sanosc_brw(int cmd, struct obd_export *exp, struct obdo *oa,
2573                       struct lov_stripe_md *lsm, obd_count page_count,
2574                       struct brw_page *pga, struct obd_trans_info *oti)
2575 {
2576         ENTRY;
2577
2578         while (page_count) {
2579                 obd_count pages_per_brw;
2580                 int rc;
2581
2582                 if (page_count > PTLRPC_MAX_BRW_PAGES)
2583                         pages_per_brw = PTLRPC_MAX_BRW_PAGES;
2584                 else
2585                         pages_per_brw = page_count;
2586
2587                 if (cmd & OBD_BRW_WRITE)
2588                         rc = sanosc_brw_write(exp, oa, lsm, pages_per_brw,pga);
2589                 else
2590                         rc = sanosc_brw_read(exp, oa, lsm, pages_per_brw, pga);
2591
2592                 if (rc != 0)
2593                         RETURN(rc);
2594
2595                 page_count -= pages_per_brw;
2596                 pga += pages_per_brw;
2597         }
2598         RETURN(0);
2599 }
2600 #endif
2601
2602 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2603                                     int flags)
2604 {
2605         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2606
2607         if (lock == NULL) {
2608                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2609                 return;
2610         }
2611         l_lock(&lock->l_resource->lr_namespace->ns_lock);
2612 #ifdef __KERNEL__
2613         if (lock->l_ast_data && lock->l_ast_data != data) {
2614                 struct inode *new_inode = data;
2615                 struct inode *old_inode = lock->l_ast_data;
2616                 if (!(old_inode->i_state & I_FREEING))
2617                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2618                 LASSERTF(old_inode->i_state & I_FREEING,
2619                          "Found existing inode %p/%lu/%u state %lu in lock: "
2620                          "setting data to %p/%lu/%u\n", old_inode,
2621                          old_inode->i_ino, old_inode->i_generation,
2622                          old_inode->i_state,
2623                          new_inode, new_inode->i_ino, new_inode->i_generation);
2624         }
2625 #endif
2626         lock->l_ast_data = data;
2627         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2628         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
2629         LDLM_LOCK_PUT(lock);
2630 }
2631
2632 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2633                              ldlm_iterator_t replace, void *data)
2634 {
2635         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2636         struct obd_device *obd = class_exp2obd(exp);
2637
2638         ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data);
2639         return 0;
2640 }
2641
2642 static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
2643                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2644                        int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
2645                        void *data, __u32 lvb_len, void *lvb_swabber,
2646                        struct lustre_handle *lockh)
2647 {
2648         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2649         struct obd_device *obd = exp->exp_obd;
2650         struct ost_lvb lvb;
2651         struct ldlm_reply *rep;
2652         struct ptlrpc_request *req = NULL;
2653         int rc;
2654         ENTRY;
2655
2656         /* Filesystem lock extents are extended to page boundaries so that
2657          * dealing with the page cache is a little smoother.  */
2658         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2659         policy->l_extent.end |= ~PAGE_MASK;
2660
2661         if (lsm->lsm_oinfo->loi_kms_valid == 0)
2662                 goto no_match;
2663
2664         /* Next, search for already existing extent locks that will cover us */
2665         rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
2666                              lockh);
2667         if (rc == 1) {
2668                 osc_set_data_with_check(lockh, data, *flags);
2669                 if (*flags & LDLM_FL_HAS_INTENT) {
2670                         /* I would like to be able to ASSERT here that rss <=
2671                          * kms, but I can't, for reasons which are explained in
2672                          * lov_enqueue() */
2673                 }
2674                 /* We already have a lock, and it's referenced */
2675                 RETURN(ELDLM_OK);
2676         }
2677
2678         /* If we're trying to read, we also search for an existing PW lock.  The
2679          * VFS and page cache already protect us locally, so lots of readers/
2680          * writers can share a single PW lock.
2681          *
2682          * There are problems with conversion deadlocks, so instead of
2683          * converting a read lock to a write lock, we'll just enqueue a new
2684          * one.
2685          *
2686          * At some point we should cancel the read lock instead of making them
2687          * send us a blocking callback, but there are problems with canceling
2688          * locks out from other users right now, too. */
2689
2690         if (mode == LCK_PR) {
2691                 rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type,
2692                                      policy, LCK_PW, lockh);
2693                 if (rc == 1) {
2694                         /* FIXME: This is not incredibly elegant, but it might
2695                          * be more elegant than adding another parameter to
2696                          * lock_match.  I want a second opinion. */
2697                         ldlm_lock_addref(lockh, LCK_PR);
2698                         ldlm_lock_decref(lockh, LCK_PW);
2699                         osc_set_data_with_check(lockh, data, *flags);
2700                         RETURN(ELDLM_OK);
2701                 }
2702         }
2703
2704  no_match:
2705         if (*flags & LDLM_FL_HAS_INTENT) {
2706                 int size[2] = {sizeof(struct ldlm_request), sizeof(lvb)};
2707
2708                 req = ptlrpc_prep_req(class_exp2cliimp(exp),
2709                                       LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 1,
2710                                       size, NULL);
2711                 if (req == NULL)
2712                         RETURN(-ENOMEM);
2713
2714                 size[0] = sizeof(*rep);
2715                 req->rq_replen = lustre_msg_size(2, size);
2716         }
2717
2718         rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
2719                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
2720                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
2721
2722         if (req != NULL) {
2723                 if (rc == ELDLM_LOCK_ABORTED) {
2724                         /* swabbed by ldlm_cli_enqueue() */
2725                         LASSERT_REPSWABBED(req, 0);
2726                         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
2727                         LASSERT(rep != NULL);
2728                         if (rep->lock_policy_res1)
2729                                 rc = rep->lock_policy_res1;
2730                 }
2731                 ptlrpc_req_finished(req);
2732         }
2733
2734         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
2735                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2736                        lvb.lvb_size, lvb.lvb_blocks, lvb.lvb_mtime);
2737                 lsm->lsm_oinfo->loi_rss = lvb.lvb_size;
2738                 lsm->lsm_oinfo->loi_mtime = lvb.lvb_mtime;
2739                 lsm->lsm_oinfo->loi_blocks = lvb.lvb_blocks;
2740         }
2741
2742         RETURN(rc);
2743 }
2744
2745 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2746                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2747                      int *flags, void *data, struct lustre_handle *lockh)
2748 {
2749         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2750         struct obd_device *obd = exp->exp_obd;
2751         int rc;
2752         ENTRY;
2753
2754         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2755
2756         /* Filesystem lock extents are extended to page boundaries so that
2757          * dealing with the page cache is a little smoother */
2758         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2759         policy->l_extent.end |= ~PAGE_MASK;
2760
2761         /* Next, search for already existing extent locks that will cover us */
2762         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2763                              policy, mode, lockh);
2764         if (rc) {
2765                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2766                         osc_set_data_with_check(lockh, data, *flags);
2767                 RETURN(rc);
2768         }
2769         /* If we're trying to read, we also search for an existing PW lock.  The
2770          * VFS and page cache already protect us locally, so lots of readers/
2771          * writers can share a single PW lock. */
2772         if (mode == LCK_PR) {
2773                 rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
2774                                      policy, LCK_PW, lockh);
2775                 if (rc == 1 && !(*flags & LDLM_FL_TEST_LOCK)) {
2776                         /* FIXME: This is not incredibly elegant, but it might
2777                          * be more elegant than adding another parameter to
2778                          * lock_match.  I want a second opinion. */
2779                         osc_set_data_with_check(lockh, data, *flags);
2780                         ldlm_lock_addref(lockh, LCK_PR);
2781                         ldlm_lock_decref(lockh, LCK_PW);
2782                 }
2783         }
2784         RETURN(rc);
2785 }
2786
2787 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2788                       __u32 mode, struct lustre_handle *lockh)
2789 {
2790         ENTRY;
2791
2792         if (unlikely(mode == LCK_GROUP))
2793                 ldlm_lock_decref_and_cancel(lockh, mode);
2794         else
2795                 ldlm_lock_decref(lockh, mode);
2796
2797         RETURN(0);
2798 }
2799
2800 static int osc_cancel_unused(struct obd_export *exp,
2801                              struct lov_stripe_md *lsm, int flags, void *opaque)
2802 {
2803         struct obd_device *obd = class_exp2obd(exp);
2804         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2805
2806         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2807                                       opaque);
2808 }
2809
2810 static int osc_join_lru(struct obd_export *exp,
2811                         struct lov_stripe_md *lsm, int join)
2812 {
2813         struct obd_device *obd = class_exp2obd(exp);
2814         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2815
2816         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2817 }
2818
2819 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2820                       unsigned long max_age)
2821 {
2822         struct obd_statfs *msfs;
2823         struct ptlrpc_request *request;
2824         int rc, size = sizeof(*osfs);
2825         ENTRY;
2826
2827         /* We could possibly pass max_age in the request (as an absolute
2828          * timestamp or a "seconds.usec ago") so the target can avoid doing
2829          * extra calls into the filesystem if that isn't necessary (e.g.
2830          * during mount that would help a bit).  Having relative timestamps
2831          * is not so great if request processing is slow, while absolute
2832          * timestamps are not ideal because they need time synchronization. */
2833         request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2834                                   OST_STATFS,0,NULL,NULL);
2835         if (!request)
2836                 RETURN(-ENOMEM);
2837
2838         request->rq_replen = lustre_msg_size(1, &size);
2839         request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2840
2841         rc = ptlrpc_queue_wait(request);
2842         if (rc)
2843                 GOTO(out, rc);
2844
2845         msfs = lustre_swab_repbuf(request, 0, sizeof(*msfs),
2846                                   lustre_swab_obd_statfs);
2847         if (msfs == NULL) {
2848                 CERROR("Can't unpack obd_statfs\n");
2849                 GOTO(out, rc = -EPROTO);
2850         }
2851
2852         memcpy(osfs, msfs, sizeof(*osfs));
2853
2854         EXIT;
2855  out:
2856         ptlrpc_req_finished(request);
2857         return rc;
2858 }
2859
2860 /* Retrieve object striping information.
2861  *
2862  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2863  * the maximum number of OST indices which will fit in the user buffer.
2864  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2865  */
2866 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2867 {
2868         struct lov_user_md lum, *lumk;
2869         int rc = 0, lum_size;
2870         ENTRY;
2871
2872         if (!lsm)
2873                 RETURN(-ENODATA);
2874
2875         if (copy_from_user(&lum, lump, sizeof(lum)))
2876                 RETURN(-EFAULT);
2877
2878         if (lum.lmm_magic != LOV_USER_MAGIC)
2879                 RETURN(-EINVAL);
2880
2881         if (lum.lmm_stripe_count > 0) {
2882                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
2883                 OBD_ALLOC(lumk, lum_size);
2884                 if (!lumk)
2885                         RETURN(-ENOMEM);
2886
2887                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
2888         } else {
2889                 lum_size = sizeof(lum);
2890                 lumk = &lum;
2891         }
2892
2893         lumk->lmm_object_id = lsm->lsm_object_id;
2894         lumk->lmm_stripe_count = 1;
2895
2896         if (copy_to_user(lump, lumk, lum_size))
2897                 rc = -EFAULT;
2898
2899         if (lumk != &lum)
2900                 OBD_FREE(lumk, lum_size);
2901
2902         RETURN(rc);
2903 }
2904
2905
2906 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2907                          void *karg, void *uarg)
2908 {
2909         struct obd_device *obd = exp->exp_obd;
2910         struct obd_ioctl_data *data = karg;
2911         int err = 0;
2912         ENTRY;
2913
2914 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2915         MOD_INC_USE_COUNT;
2916 #else
2917         if (!try_module_get(THIS_MODULE)) {
2918                 CERROR("Can't get module. Is it alive?");
2919                 return -EINVAL;
2920         }
2921 #endif
2922         switch (cmd) {
2923         case OBD_IOC_LOV_GET_CONFIG: {
2924                 char *buf;
2925                 struct lov_desc *desc;
2926                 struct obd_uuid uuid;
2927
2928                 buf = NULL;
2929                 len = 0;
2930                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2931                         GOTO(out, err = -EINVAL);
2932
2933                 data = (struct obd_ioctl_data *)buf;
2934
2935                 if (sizeof(*desc) > data->ioc_inllen1) {
2936                         obd_ioctl_freedata(buf, len);
2937                         GOTO(out, err = -EINVAL);
2938                 }
2939
2940                 if (data->ioc_inllen2 < sizeof(uuid)) {
2941                         obd_ioctl_freedata(buf, len);
2942                         GOTO(out, err = -EINVAL);
2943                 }
2944
2945                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2946                 desc->ld_tgt_count = 1;
2947                 desc->ld_active_tgt_count = 1;
2948                 desc->ld_default_stripe_count = 1;
2949                 desc->ld_default_stripe_size = 0;
2950                 desc->ld_default_stripe_offset = 0;
2951                 desc->ld_pattern = 0;
2952                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2953
2954                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2955
2956                 err = copy_to_user((void *)uarg, buf, len);
2957                 if (err)
2958                         err = -EFAULT;
2959                 obd_ioctl_freedata(buf, len);
2960                 GOTO(out, err);
2961         }
2962         case LL_IOC_LOV_SETSTRIPE:
2963                 err = obd_alloc_memmd(exp, karg);
2964                 if (err > 0)
2965                         err = 0;
2966                 GOTO(out, err);
2967         case LL_IOC_LOV_GETSTRIPE:
2968                 err = osc_getstripe(karg, uarg);
2969                 GOTO(out, err);
2970         case OBD_IOC_CLIENT_RECOVER:
2971                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2972                                             data->ioc_inlbuf1);
2973                 if (err > 0)
2974                         err = 0;
2975                 GOTO(out, err);
2976         case IOC_OSC_SET_ACTIVE:
2977                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2978                                                data->ioc_offset);
2979                 GOTO(out, err);
2980         case OBD_IOC_POLL_QUOTACHECK:
2981                 err = lquota_poll_check(quota_interface, exp,
2982                                         (struct if_quotacheck *)karg);
2983                 GOTO(out, err);
2984         default:
2985                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2986                        cmd, current->comm);
2987                 GOTO(out, err = -ENOTTY);
2988         }
2989 out:
2990 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
2991         MOD_DEC_USE_COUNT;
2992 #else
2993         module_put(THIS_MODULE);
2994 #endif
2995         return err;
2996 }
2997
2998 static int osc_get_info(struct obd_export *exp, obd_count keylen,
2999                         void *key, __u32 *vallen, void *val)
3000 {
3001         ENTRY;
3002         if (!vallen || !val)
3003                 RETURN(-EFAULT);
3004
3005         if (keylen > strlen("lock_to_stripe") &&
3006             strcmp(key, "lock_to_stripe") == 0) {
3007                 __u32 *stripe = val;
3008                 *vallen = sizeof(*stripe);
3009                 *stripe = 0;
3010                 RETURN(0);
3011         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3012                 struct ptlrpc_request *req;
3013                 obd_id *reply;
3014                 char *bufs[1] = {key};
3015                 int rc;
3016                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3017                                       OST_GET_INFO, 1, &keylen, bufs);
3018                 if (req == NULL)
3019                         RETURN(-ENOMEM);
3020
3021                 req->rq_replen = lustre_msg_size(1, vallen);
3022                 rc = ptlrpc_queue_wait(req);
3023                 if (rc)
3024                         GOTO(out, rc);
3025
3026                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
3027                                            lustre_swab_ost_last_id);
3028                 if (reply == NULL) {
3029                         CERROR("Can't unpack OST last ID\n");
3030                         GOTO(out, rc = -EPROTO);
3031                 }
3032                 *((obd_id *)val) = *reply;
3033         out:
3034                 ptlrpc_req_finished(req);
3035                 RETURN(rc);
3036         }
3037         RETURN(-EINVAL);
3038 }
3039
3040 static int osc_set_info(struct obd_export *exp, obd_count keylen,
3041                         void *key, obd_count vallen, void *val)
3042 {
3043         struct ptlrpc_request *req;
3044         struct obd_device  *obd = exp->exp_obd;
3045         struct obd_import *imp = class_exp2cliimp(exp);
3046         struct llog_ctxt *ctxt;
3047         int rc, size[2] = {keylen, vallen};
3048         char *bufs[2] = {key, val};
3049         ENTRY;
3050
3051         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3052
3053         if (KEY_IS(KEY_NEXT_ID)) {
3054                 if (vallen != sizeof(obd_id))
3055                         RETURN(-EINVAL);
3056                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3057                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3058                        exp->exp_obd->obd_name,
3059                        obd->u.cli.cl_oscc.oscc_next_id);
3060
3061                 RETURN(0);
3062         }
3063         
3064         if (KEY_IS("unlinked")) {
3065                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3066                 spin_lock(&oscc->oscc_lock);
3067                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3068                 spin_unlock(&oscc->oscc_lock);
3069                 RETURN(0);
3070         }
3071
3072         if (KEY_IS("initial_recov")) {
3073                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
3074                 if (vallen != sizeof(int))
3075                         RETURN(-EINVAL);
3076                 imp->imp_initial_recov = *(int *)val;
3077                 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
3078                        exp->exp_obd->obd_name,
3079                        imp->imp_initial_recov);
3080                 RETURN(0);
3081         }
3082
3083         if (KEY_IS("checksum")) {
3084                 if (vallen != sizeof(int))
3085                         RETURN(-EINVAL);
3086                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3087                 RETURN(0);
3088         }
3089
3090         if (!KEY_IS(KEY_MDS_CONN) && !KEY_IS("evict_by_nid"))
3091                 RETURN(-EINVAL);
3092
3093
3094         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO,
3095                               2, size, bufs);
3096         if (req == NULL)
3097                 RETURN(-ENOMEM);
3098
3099         req->rq_replen = lustre_msg_size(0, NULL);
3100         rc = ptlrpc_queue_wait(req);
3101         ptlrpc_req_finished(req);
3102
3103         ctxt = llog_get_context(exp->exp_obd, LLOG_MDS_OST_ORIG_CTXT);
3104         if (ctxt) {
3105                 if (rc == 0)
3106                         rc = llog_initiator_connect(ctxt);
3107                 else
3108                         CERROR("cannot establish connection for ctxt %p: %d\n",
3109                                ctxt, rc);
3110         }
3111
3112         imp->imp_server_timeout = 1;
3113         CDEBUG(D_HA, "pinging OST %s\n", imp->imp_target_uuid.uuid);
3114         imp->imp_pingable = 1;
3115
3116         RETURN(rc);
3117 }
3118
3119
3120 static struct llog_operations osc_size_repl_logops = {
3121         lop_cancel: llog_obd_repl_cancel
3122 };
3123
3124 static struct llog_operations osc_mds_ost_orig_logops;
3125 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3126                         int count, struct llog_catid *catid)
3127 {
3128         int rc;
3129         ENTRY;
3130
3131         osc_mds_ost_orig_logops = llog_lvfs_ops;
3132         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3133         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3134         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3135         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3136
3137         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3138                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3139         if (rc)
3140                 RETURN(rc);
3141
3142         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3143                         &osc_size_repl_logops);
3144         RETURN(rc);
3145 }
3146
3147 static int osc_llog_finish(struct obd_device *obd, int count)
3148 {
3149         struct llog_ctxt *ctxt;
3150         int rc = 0, rc2 = 0;
3151         ENTRY;
3152
3153         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3154         if (ctxt)
3155                 rc = llog_cleanup(ctxt);
3156
3157         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3158         if (ctxt)
3159                 rc2 = llog_cleanup(ctxt);
3160         if (!rc)
3161                 rc = rc2;
3162
3163         RETURN(rc);
3164 }
3165
3166 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3167                          struct obd_uuid *cluuid,
3168                          struct obd_connect_data *data)
3169 {
3170         struct client_obd *cli = &obd->u.cli;
3171
3172         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3173                 long lost_grant;
3174
3175                 spin_lock(&cli->cl_loi_list_lock);
3176                 data->ocd_grant = cli->cl_avail_grant ?:
3177                                 2 * cli->cl_max_pages_per_rpc << PAGE_SHIFT;
3178                 lost_grant = cli->cl_lost_grant;
3179                 cli->cl_lost_grant = 0;
3180                 spin_unlock(&cli->cl_loi_list_lock);
3181
3182                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3183                        "cl_lost_grant: %ld\n", data->ocd_grant,
3184                        cli->cl_avail_grant, lost_grant);
3185                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3186                        " ocd_grant: %d\n", data->ocd_connect_flags,
3187                        data->ocd_version, data->ocd_grant);
3188         }
3189
3190         RETURN(0);
3191 }
3192
3193 static int osc_disconnect(struct obd_export *exp)
3194 {
3195         struct obd_device *obd = class_exp2obd(exp);
3196         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3197         int rc;
3198
3199         if (obd->u.cli.cl_conn_count == 1)
3200                 /* flush any remaining cancel messages out to the target */
3201                 llog_sync(ctxt, exp);
3202
3203         rc = client_disconnect_export(exp);
3204         return rc;
3205 }
3206
3207 static int osc_import_event(struct obd_device *obd,
3208                             struct obd_import *imp,
3209                             enum obd_import_event event)
3210 {
3211         struct client_obd *cli;
3212         int rc = 0;
3213
3214         LASSERT(imp->imp_obd == obd);
3215
3216         switch (event) {
3217         case IMP_EVENT_DISCON: {
3218                 /* Only do this on the MDS OSC's */
3219                 if (imp->imp_server_timeout) {
3220                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3221
3222                         spin_lock(&oscc->oscc_lock);
3223                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3224                         spin_unlock(&oscc->oscc_lock);
3225                 }
3226
3227                 break;
3228         }
3229         case IMP_EVENT_INACTIVE: {
3230                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3231                 break;
3232         }
3233         case IMP_EVENT_INVALIDATE: {
3234                 struct ldlm_namespace *ns = obd->obd_namespace;
3235
3236                 /* Reset grants */
3237                 cli = &obd->u.cli;
3238                 spin_lock(&cli->cl_loi_list_lock);
3239                 cli->cl_avail_grant = 0;
3240                 cli->cl_lost_grant = 0;
3241                 /* all pages go to failing rpcs due to the invalid import */
3242                 osc_check_rpcs(cli);
3243                 spin_unlock(&cli->cl_loi_list_lock);
3244
3245                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3246
3247                 break;
3248         }
3249         case IMP_EVENT_ACTIVE: {
3250                 /* Only do this on the MDS OSC's */
3251                 if (imp->imp_server_timeout) {
3252                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3253
3254                         spin_lock(&oscc->oscc_lock);
3255                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3256                         spin_unlock(&oscc->oscc_lock);
3257                 }
3258                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3259                 break;
3260         }
3261         case IMP_EVENT_OCD: {
3262                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3263
3264                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3265                         osc_init_grant(&obd->u.cli, ocd);
3266
3267                 /* See bug 7198 */
3268                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3269                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3270
3271                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3272                 break;
3273         }
3274         default:
3275                 CERROR("Unknown import event %d\n", event);
3276                 LBUG();
3277         }
3278         RETURN(rc);
3279 }
3280
3281 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3282 {
3283         int rc;
3284
3285         rc = ptlrpcd_addref();
3286         if (rc)
3287                 return rc;
3288
3289         rc = client_obd_setup(obd, len, buf);
3290         if (rc) {
3291                 ptlrpcd_decref();
3292         } else {
3293                 struct lprocfs_static_vars lvars;
3294                 struct client_obd *cli = &obd->u.cli;
3295
3296                 lprocfs_init_vars(osc, &lvars);
3297                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3298                         lproc_osc_attach_seqstat(obd);
3299                         ptlrpc_lprocfs_register_obd(obd);
3300                 }
3301
3302                 oscc_init(obd);
3303                 /* We need to allocate a few requests more, because
3304                    brw_interpret_oap tries to create new requests before freeing
3305                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3306                    reserved, but I afraid that might be too much wasted RAM
3307                    in fact, so 2 is just my guess and still should work. */
3308                 cli->cl_rq_pool = ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3309                                                       OST_MAXREQSIZE,
3310                                                       ptlrpc_add_rqs_to_pool);
3311         }
3312
3313         RETURN(rc);
3314 }
3315
3316 static int osc_precleanup(struct obd_device *obd, int stage)
3317 {
3318         int rc = 0;
3319         ENTRY;
3320
3321         switch (stage) {
3322         case OBD_CLEANUP_EARLY: {
3323                 struct obd_import *imp;
3324                 imp = obd->u.cli.cl_import;
3325                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3326                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3327                 ptlrpc_deactivate_import(imp);
3328                 break;
3329         }
3330         case OBD_CLEANUP_SELF_EXP:
3331                 rc = obd_llog_finish(obd, 0);
3332                 if (rc != 0)
3333                         CERROR("failed to cleanup llogging subsystems\n");
3334         }
3335         RETURN(rc);
3336 }
3337
3338 int osc_cleanup(struct obd_device *obd)
3339 {
3340         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3341         struct client_obd *cli = &obd->u.cli;
3342         int rc;
3343
3344         ptlrpc_lprocfs_unregister_obd(obd);
3345         lprocfs_obd_cleanup(obd);
3346
3347         spin_lock(&oscc->oscc_lock);
3348         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3349         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3350         spin_unlock(&oscc->oscc_lock);
3351
3352         /* free memory of osc quota cache */
3353         lquota_cleanup(quota_interface, obd);
3354
3355         rc = client_obd_cleanup(obd);
3356
3357         ptlrpc_free_rq_pool(cli->cl_rq_pool);
3358
3359         ptlrpcd_decref();
3360         RETURN(rc);
3361 }
3362
3363
3364 struct obd_ops osc_obd_ops = {
3365         .o_owner                = THIS_MODULE,
3366         .o_setup                = osc_setup,
3367         .o_precleanup           = osc_precleanup,
3368         .o_cleanup              = osc_cleanup,
3369         .o_add_conn             = client_import_add_conn,
3370         .o_del_conn             = client_import_del_conn,
3371         .o_connect              = client_connect_import,
3372         .o_reconnect            = osc_reconnect,
3373         .o_disconnect           = osc_disconnect,
3374         .o_statfs               = osc_statfs,
3375         .o_packmd               = osc_packmd,
3376         .o_unpackmd             = osc_unpackmd,
3377         .o_create               = osc_create,
3378         .o_destroy              = osc_destroy,
3379         .o_getattr              = osc_getattr,
3380         .o_getattr_async        = osc_getattr_async,
3381         .o_setattr              = osc_setattr,
3382         .o_setattr_async        = osc_setattr_async,
3383         .o_brw                  = osc_brw,
3384         .o_brw_async            = osc_brw_async,
3385         .o_prep_async_page      = osc_prep_async_page,
3386         .o_queue_async_io       = osc_queue_async_io,
3387         .o_set_async_flags      = osc_set_async_flags,
3388         .o_queue_group_io       = osc_queue_group_io,
3389         .o_trigger_group_io     = osc_trigger_group_io,
3390         .o_teardown_async_page  = osc_teardown_async_page,
3391         .o_punch                = osc_punch,
3392         .o_sync                 = osc_sync,
3393         .o_enqueue              = osc_enqueue,
3394         .o_match                = osc_match,
3395         .o_change_cbdata        = osc_change_cbdata,
3396         .o_cancel               = osc_cancel,
3397         .o_cancel_unused        = osc_cancel_unused,
3398         .o_join_lru             = osc_join_lru,
3399         .o_iocontrol            = osc_iocontrol,
3400         .o_get_info             = osc_get_info,
3401         .o_set_info             = osc_set_info,
3402         .o_import_event         = osc_import_event,
3403         .o_llog_init            = osc_llog_init,
3404         .o_llog_finish          = osc_llog_finish,
3405 };
3406
3407 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3408 struct obd_ops sanosc_obd_ops = {
3409         .o_owner                = THIS_MODULE,
3410         .o_cleanup              = client_obd_cleanup,
3411         .o_add_conn             = client_import_add_conn,
3412         .o_del_conn             = client_import_del_conn,
3413         .o_connect              = client_connect_import,
3414         .o_reconnect            = osc_reconnect,
3415         .o_disconnect           = client_disconnect_export,
3416         .o_statfs               = osc_statfs,
3417         .o_packmd               = osc_packmd,
3418         .o_unpackmd             = osc_unpackmd,
3419         .o_create               = osc_real_create,
3420         .o_destroy              = osc_destroy,
3421         .o_getattr              = osc_getattr,
3422         .o_getattr_async        = osc_getattr_async,
3423         .o_setattr              = osc_setattr,
3424         .o_setup                = client_sanobd_setup,
3425         .o_brw                  = sanosc_brw,
3426         .o_punch                = osc_punch,
3427         .o_sync                 = osc_sync,
3428         .o_enqueue              = osc_enqueue,
3429         .o_match                = osc_match,
3430         .o_change_cbdata        = osc_change_cbdata,
3431         .o_cancel               = osc_cancel,
3432         .o_cancel_unused        = osc_cancel_unused,
3433         .o_join_lru             = osc_join_lru,
3434         .o_iocontrol            = osc_iocontrol,
3435         .o_import_event         = osc_import_event,
3436         .o_llog_init            = osc_llog_init,
3437         .o_llog_finish          = osc_llog_finish,
3438 };
3439 #endif
3440
3441 static quota_interface_t *quota_interface;
3442 extern quota_interface_t osc_quota_interface;
3443
3444 int __init osc_init(void)
3445 {
3446         struct lprocfs_static_vars lvars;
3447 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3448         struct lprocfs_static_vars sanlvars;
3449 #endif
3450         int rc;
3451         ENTRY;
3452
3453         lprocfs_init_vars(osc, &lvars);
3454 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3455         lprocfs_init_vars(osc, &sanlvars);
3456 #endif
3457
3458         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3459         lquota_init(quota_interface);
3460         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3461
3462         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3463                                  LUSTRE_OSC_NAME);
3464         if (rc) {
3465                 if (quota_interface)
3466                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3467                 RETURN(rc);
3468         }
3469
3470 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3471         rc = class_register_type(&sanosc_obd_ops, sanlvars.module_vars,
3472                                  LUSTRE_SANOSC_NAME);
3473         if (rc) {
3474                 class_unregister_type(LUSTRE_OSC_NAME);
3475                 if (quota_interface)
3476                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3477                 RETURN(rc);
3478         }
3479 #endif
3480
3481         RETURN(rc);
3482 }
3483
3484 #ifdef __KERNEL__
3485 static void /*__exit*/ osc_exit(void)
3486 {
3487         lquota_exit(quota_interface);
3488         if (quota_interface)
3489                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3490
3491 #if defined(__KERNEL__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3492         class_unregister_type(LUSTRE_SANOSC_NAME);
3493 #endif
3494         class_unregister_type(LUSTRE_OSC_NAME);
3495 }
3496
3497 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3498 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3499 MODULE_LICENSE("GPL");
3500
3501 module_init(osc_init);
3502 module_exit(osc_exit);
3503 #endif