Whamcloud - gitweb
LU-8900 snapshot: simulate readonly device
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osp/osp_sync.c
33  *
34  * Lustre OST Proxy Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MDS
41
42 #include <linux/kthread.h>
43 #include <lustre_log.h>
44 #include <lustre_update.h>
45 #include "osp_internal.h"
46
47 static int osp_sync_id_traction_init(struct osp_device *d);
48 static void osp_sync_id_traction_fini(struct osp_device *d);
49 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
50 static void osp_sync_remove_from_tracker(struct osp_device *d);
51
52 /*
53  * this is a components of OSP implementing synchronization between MDS and OST
54  * it llogs all interesting changes (currently it's uig/gid change and object
55  * destroy) atomically, then makes sure changes hit OST storage
56  *
57  * we have 4 queues of work:
58  *
59  * the first queue is llog itself, once read a change is stored in 2nd queue
60  * in form of RPC (but RPC isn't fired yet).
61  *
62  * the second queue (opd_syn_waiting_for_commit) holds changes awaiting local
63  * commit. once change is committed locally it migrates onto 3rd queue.
64  *
65  * the third queue (opd_syn_committed_here) holds changes committed locally,
66  * but not sent to OST (as the pipe can be full). once pipe becomes non-full
67  * we take a change from the queue and fire corresponded RPC.
68  *
69  * once RPC is reported committed by OST (using regular last_committed mech.)
70  * the change jumps into 4th queue (opd_syn_committed_there), now we can
71  * cancel corresponded llog record and release RPC
72  *
73  * opd_syn_changes is a number of unread llog records (to be processed).
74  * notice this number doesn't include llog records from previous boots.
75  * with OSP_SYN_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
76  *
77  * opd_syn_rpc_in_progress is a number of requests in 2-4 queues.
78  * we control this with OSP_MAX_IN_PROGRESS so that OSP don't consume
79  * too much memory -- how to deal with 1000th OSTs ? batching could help?
80  *
81  * opd_syn_rpc_in_flight is a number of RPC in flight.
82  * we control this with OSP_MAX_IN_FLIGHT
83  */
84
85 /* XXX: do math to learn reasonable threshold
86  * should it be ~ number of changes fitting bulk? */
87
88 #define OSP_SYN_THRESHOLD       10
89 #define OSP_MAX_IN_FLIGHT       8
90 #define OSP_MAX_IN_PROGRESS     4096
91
92 #define OSP_JOB_MAGIC           0x26112005
93
94 struct osp_job_req_args {
95         /** bytes reserved for ptlrpc_replay_req() */
96         struct ptlrpc_replay_async_args jra_raa;
97         struct list_head                jra_committed_link;
98         struct list_head                jra_inflight_link;
99         __u32                           jra_magic;
100 };
101
102 static inline int osp_sync_running(struct osp_device *d)
103 {
104         return !!(d->opd_syn_thread.t_flags & SVC_RUNNING);
105 }
106
107 /**
108  * Check status: whether OSP thread has stopped
109  *
110  * \param[in] d         OSP device
111  *
112  * \retval 0            still running
113  * \retval 1            stopped
114  */
115 static inline int osp_sync_stopped(struct osp_device *d)
116 {
117         return !!(d->opd_syn_thread.t_flags & SVC_STOPPED);
118 }
119
120 /*
121  ** Check for new changes to sync
122  *
123  * \param[in] d         OSP device
124  *
125  * \retval 1            there are changes
126  * \retval 0            there are no changes
127  */
128 static inline int osp_sync_has_new_job(struct osp_device *d)
129 {
130         return ((d->opd_syn_last_processed_id < d->opd_syn_last_used_id) &&
131                 (d->opd_syn_last_processed_id < d->opd_syn_last_committed_id))
132                 || (d->opd_syn_prev_done == 0);
133 }
134
135 static inline int osp_sync_inflight_conflict(struct osp_device *d,
136                                              struct llog_rec_hdr *h)
137 {
138         struct osp_job_req_args *jra;
139         struct ost_id            ostid;
140         int                      conflict = 0;
141
142         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
143             list_empty(&d->opd_syn_inflight_list))
144                 return conflict;
145
146         memset(&ostid, 0, sizeof(ostid));
147         switch (h->lrh_type) {
148         case MDS_UNLINK_REC:
149                 ostid_set_seq(&ostid, ((struct llog_unlink_rec *)h)->lur_oseq);
150                 ostid_set_id(&ostid, ((struct llog_unlink_rec *)h)->lur_oid);
151                 break;
152         case MDS_UNLINK64_REC:
153                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
154                 break;
155         case MDS_SETATTR64_REC:
156                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
157                 break;
158         default:
159                 LBUG();
160         }
161
162         spin_lock(&d->opd_syn_lock);
163         list_for_each_entry(jra, &d->opd_syn_inflight_list, jra_inflight_link) {
164                 struct ptlrpc_request   *req;
165                 struct ost_body         *body;
166
167                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
168
169                 req = container_of((void *)jra, struct ptlrpc_request,
170                                    rq_async_args);
171                 body = req_capsule_client_get(&req->rq_pill,
172                                               &RMF_OST_BODY);
173                 LASSERT(body);
174
175                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
176                         conflict = 1;
177                         break;
178                 }
179         }
180         spin_unlock(&d->opd_syn_lock);
181
182         return conflict;
183 }
184
185 static inline int osp_sync_low_in_progress(struct osp_device *d)
186 {
187         return atomic_read(&d->opd_syn_rpc_in_progress) <
188                 d->opd_syn_max_rpc_in_progress;
189 }
190
191 /**
192  * Check for room in the network pipe to OST
193  *
194  * \param[in] d         OSP device
195  *
196  * \retval 1            there is room
197  * \retval 0            no room, the pipe is full
198  */
199 static inline int osp_sync_low_in_flight(struct osp_device *d)
200 {
201         return atomic_read(&d->opd_syn_rpc_in_flight) <
202                 d->opd_syn_max_rpc_in_flight;
203 }
204
205 /**
206  * Wake up check for the main sync thread
207  *
208  * \param[in] d         OSP device
209  *
210  * \retval 1            time to wake up
211  * \retval 0            no need to wake up
212  */
213 static inline int osp_sync_has_work(struct osp_device *d)
214 {
215         /* has new/old changes and low in-progress? */
216         if (osp_sync_has_new_job(d) && osp_sync_low_in_progress(d) &&
217             osp_sync_low_in_flight(d) && d->opd_imp_connected)
218                 return 1;
219
220         /* has remotely committed? */
221         if (!list_empty(&d->opd_syn_committed_there))
222                 return 1;
223
224         return 0;
225 }
226
227 #define osp_sync_check_for_work(d)                      \
228 {                                                       \
229         if (osp_sync_has_work(d)) {                     \
230                 wake_up(&d->opd_syn_waitq);    \
231         }                                               \
232 }
233
234 void __osp_sync_check_for_work(struct osp_device *d)
235 {
236         osp_sync_check_for_work(d);
237 }
238
239 static inline __u64 osp_sync_correct_id(struct osp_device *d,
240                                         struct llog_rec_hdr *rec)
241 {
242         /*
243          * llog use cyclic store with 32 bit lrh_id
244          * so overflow lrh_id is possible. Range between
245          * last_processed and last_committed is less than
246          * 64745 ^ 2 and less than 2^32 - 1
247          */
248         __u64 correct_id = d->opd_syn_last_committed_id;
249
250         if ((correct_id & 0xffffffffULL) < rec->lrh_id)
251                 correct_id -= 0x100000000ULL;
252
253         correct_id &= ~0xffffffffULL;
254         correct_id |= rec->lrh_id;
255
256         return correct_id;
257 }
258 /**
259  * Check and return ready-for-new status.
260  *
261  * The thread processing llog record uses this function to check whether
262  * it's time to take another record and process it. The number of conditions
263  * must be met: the connection should be ready, RPCs in flight not exceeding
264  * the limit, the record is committed locally, etc (see the lines below).
265  *
266  * \param[in] d         OSP device
267  * \param[in] rec       next llog record to process
268  *
269  * \retval 0            not ready
270  * \retval 1            ready
271  */
272 static inline int osp_sync_can_process_new(struct osp_device *d,
273                                            struct llog_rec_hdr *rec)
274 {
275         LASSERT(d);
276
277         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
278                 return 0;
279         if (unlikely(osp_sync_inflight_conflict(d, rec)))
280                 return 0;
281         if (!osp_sync_low_in_progress(d))
282                 return 0;
283         if (!osp_sync_low_in_flight(d))
284                 return 0;
285         if (!d->opd_imp_connected)
286                 return 0;
287         if (d->opd_syn_prev_done == 0)
288                 return 1;
289         if (atomic_read(&d->opd_syn_changes) == 0)
290                 return 0;
291         if (rec == NULL ||
292             osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
293                 return 1;
294         return 0;
295 }
296
297 /**
298  * Declare intention to add a new change.
299  *
300  * With regard to OSD API, we have to declare any changes ahead. In this
301  * case we declare an intention to add a llog record representing the
302  * change on the local storage.
303  *
304  * \param[in] env       LU environment provided by the caller
305  * \param[in] o         OSP object
306  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
307  * \param[in] th        transaction handle (local)
308  *
309  * \retval 0            on success
310  * \retval negative     negated errno on error
311  */
312 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
313                          llog_op_type type, struct thandle *th)
314 {
315         struct osp_thread_info  *osi = osp_env_info(env);
316         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
317         struct llog_ctxt        *ctxt;
318         struct thandle          *storage_th;
319         int                      rc;
320
321         ENTRY;
322
323         /* it's a layering violation, to access internals of th,
324          * but we can do this as a sanity check, for a while */
325         LASSERT(th->th_top != NULL);
326         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
327         if (IS_ERR(storage_th))
328                 RETURN(PTR_ERR(storage_th));
329
330         switch (type) {
331         case MDS_UNLINK64_REC:
332                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
333                 break;
334         case MDS_SETATTR64_REC:
335                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec);
336                 break;
337         default:
338                 LBUG();
339         }
340
341         /* we want ->dt_trans_start() to allocate per-thandle structure */
342         storage_th->th_tags |= LCT_OSP_THREAD;
343
344         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
345         LASSERT(ctxt);
346
347         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
348                               storage_th);
349         llog_ctxt_put(ctxt);
350
351         RETURN(rc);
352 }
353
354 /**
355  * Generate a llog record for a given change.
356  *
357  * Generates a llog record for the change passed. The change can be of two
358  * types: unlink and setattr. The record gets an ID which later will be
359  * used to track commit status of the change. For unlink changes, the caller
360  * can supply a starting FID and the count of the objects to destroy. For
361  * setattr the caller should apply attributes to apply.
362  *
363  *
364  * \param[in] env       LU environment provided by the caller
365  * \param[in] d         OSP device
366  * \param[in] fid       fid of the object the change should be applied to
367  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
368  * \param[in] count     count of objects to destroy
369  * \param[in] th        transaction handle (local)
370  * \param[in] attr      attributes for setattr
371  *
372  * \retval 0            on success
373  * \retval negative     negated errno on error
374  */
375 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
376                             const struct lu_fid *fid, llog_op_type type,
377                             int count, struct thandle *th,
378                             const struct lu_attr *attr)
379 {
380         struct osp_thread_info  *osi = osp_env_info(env);
381         struct llog_ctxt        *ctxt;
382         struct osp_txn_info     *txn;
383         struct thandle          *storage_th;
384         int                      rc;
385
386         ENTRY;
387
388         /* it's a layering violation, to access internals of th,
389          * but we can do this as a sanity check, for a while */
390         LASSERT(th->th_top != NULL);
391         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
392         if (IS_ERR(storage_th))
393                 RETURN(PTR_ERR(storage_th));
394
395         switch (type) {
396         case MDS_UNLINK64_REC:
397                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
398                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
399                 osi->osi_unlink.lur_fid  = *fid;
400                 osi->osi_unlink.lur_count = count;
401                 break;
402         case MDS_SETATTR64_REC:
403                 rc = fid_to_ostid(fid, &osi->osi_oi);
404                 LASSERT(rc == 0);
405                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
406                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
407                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
408                 LASSERT(attr);
409                 osi->osi_setattr.lsr_uid = attr->la_uid;
410                 osi->osi_setattr.lsr_gid = attr->la_gid;
411                 osi->osi_setattr.lsr_valid =
412                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
413                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0);
414                 break;
415         default:
416                 LBUG();
417         }
418
419         txn = osp_txn_info(&storage_th->th_ctx);
420         LASSERT(txn);
421
422         txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
423         osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
424         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
425         if (ctxt == NULL)
426                 RETURN(-ENOMEM);
427
428         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
429                       storage_th);
430         llog_ctxt_put(ctxt);
431
432         if (likely(rc >= 0)) {
433                 CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
434                        d->opd_obd->obd_name,
435                        POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
436                        (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
437                        (unsigned long)osi->osi_cookie.lgc_index, rc);
438                 atomic_inc(&d->opd_syn_changes);
439         }
440         /* return 0 always here, error case just cause no llog record */
441         RETURN(0);
442 }
443
444 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
445                  llog_op_type type, struct thandle *th,
446                  const struct lu_attr *attr)
447 {
448         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
449                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
450                                 th, attr);
451 }
452
453 int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
454                         struct lu_fid *fid, int lost, struct thandle *th)
455 {
456         return osp_sync_add_rec(env, d, fid, MDS_UNLINK64_REC, lost, th, NULL);
457 }
458
459 /*
460  * it's quite obvious we can't maintain all the structures in the memory:
461  * while OST is down, MDS can be processing thousands and thousands of unlinks
462  * filling persistent llogs and in-core respresentation
463  *
464  * this doesn't scale at all. so we need basically the following:
465  * a) destroy/setattr append llog records
466  * b) once llog has grown to X records, we process first Y committed records
467  *
468  *  once record R is found via llog_process(), it becomes committed after any
469  *  subsequent commit callback (at the most)
470  */
471
472 /**
473  * ptlrpc commit callback.
474  *
475  * The callback is called by PTLRPC when a RPC is reported committed by the
476  * target (OST). We register the callback for the every RPC applying a change
477  * from the llog. This way we know then the llog records can be cancelled.
478  * Notice the callback can be called when OSP is finishing. We can detect this
479  * checking that actual transno in the request is less or equal of known
480  * committed transno (see osp_sync_process_committed() for the details).
481  * XXX: this is pretty expensive and can be improved later using batching.
482  *
483  * \param[in] req       request
484  */
485 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
486 {
487         struct osp_device *d = req->rq_cb_data;
488         struct osp_job_req_args *jra;
489
490         CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
491
492         if (unlikely(req->rq_transno == 0))
493                 return;
494
495         /* do not do any opd_dyn_rpc_* accounting here
496          * it's done in osp_sync_interpret sooner or later */
497         LASSERT(d);
498
499         jra = ptlrpc_req_async_args(req);
500         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
501         LASSERT(list_empty(&jra->jra_committed_link));
502
503         ptlrpc_request_addref(req);
504
505         spin_lock(&d->opd_syn_lock);
506         list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
507         spin_unlock(&d->opd_syn_lock);
508
509         /* XXX: some batching wouldn't hurt */
510         wake_up(&d->opd_syn_waitq);
511 }
512
513 /**
514  * RPC interpretation callback.
515  *
516  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
517  * whether we should:
518  *  - put request on a special list to wait until it's committed by the target,
519  *    if the request is successful
520  *  - schedule llog record cancel if no target object is found
521  *  - try later (essentially after reboot) in case of unexpected error
522  *
523  * \param[in] env       LU environment provided by the caller
524  * \param[in] req       request replied
525  * \param[in] aa        callback data
526  * \param[in] rc        result of RPC
527  *
528  * \retval 0            always
529  */
530 static int osp_sync_interpret(const struct lu_env *env,
531                               struct ptlrpc_request *req, void *aa, int rc)
532 {
533         struct osp_device *d = req->rq_cb_data;
534         struct osp_job_req_args *jra = aa;
535
536         if (jra->jra_magic != OSP_JOB_MAGIC) {
537                 DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
538                 LBUG();
539         }
540         LASSERT(d);
541
542         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
543                atomic_read(&req->rq_refcount),
544                rc, (unsigned) req->rq_transno);
545         LASSERT(rc || req->rq_transno);
546
547         if (rc == -ENOENT) {
548                 /*
549                  * we tried to destroy object or update attributes,
550                  * but object doesn't exist anymore - cancell llog record
551                  */
552                 LASSERT(req->rq_transno == 0);
553                 LASSERT(list_empty(&jra->jra_committed_link));
554
555                 ptlrpc_request_addref(req);
556
557                 spin_lock(&d->opd_syn_lock);
558                 list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
559                 spin_unlock(&d->opd_syn_lock);
560
561                 wake_up(&d->opd_syn_waitq);
562         } else if (rc) {
563                 struct obd_import *imp = req->rq_import;
564                 /*
565                  * error happened, we'll try to repeat on next boot ?
566                  */
567                 LASSERTF(req->rq_transno == 0 ||
568                          req->rq_import_generation < imp->imp_generation,
569                          "transno %llu, rc %d, gen: req %d, imp %d\n",
570                          req->rq_transno, rc, req->rq_import_generation,
571                          imp->imp_generation);
572                 if (req->rq_transno == 0) {
573                         /* this is the last time we see the request
574                          * if transno is not zero, then commit cb
575                          * will be called at some point */
576                         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
577                         atomic_dec(&d->opd_syn_rpc_in_progress);
578                 }
579
580                 wake_up(&d->opd_syn_waitq);
581         } else if (d->opd_pre != NULL &&
582                    unlikely(d->opd_pre_status == -ENOSPC)) {
583                 /*
584                  * if current status is -ENOSPC (lack of free space on OST)
585                  * then we should poll OST immediately once object destroy
586                  * is replied
587                  */
588                 osp_statfs_need_now(d);
589         }
590
591         spin_lock(&d->opd_syn_lock);
592         list_del_init(&jra->jra_inflight_link);
593         spin_unlock(&d->opd_syn_lock);
594         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
595         atomic_dec(&d->opd_syn_rpc_in_flight);
596         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
597                 wake_up(&d->opd_syn_barrier_waitq);
598         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
599                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
600                atomic_read(&d->opd_syn_rpc_in_progress));
601
602         osp_sync_check_for_work(d);
603
604         return 0;
605 }
606
607 /*
608  ** Add request to ptlrpc queue.
609  *
610  * This is just a tiny helper function to put the request on the sending list
611  *
612  * \param[in] d         OSP device
613  * \param[in] req       request
614  */
615 static void osp_sync_send_new_rpc(struct osp_device *d,
616                                   struct ptlrpc_request *req)
617 {
618         struct osp_job_req_args *jra;
619
620         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
621                 d->opd_syn_max_rpc_in_flight);
622
623         jra = ptlrpc_req_async_args(req);
624         jra->jra_magic = OSP_JOB_MAGIC;
625         INIT_LIST_HEAD(&jra->jra_committed_link);
626         spin_lock(&d->opd_syn_lock);
627         list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
628         spin_unlock(&d->opd_syn_lock);
629
630         ptlrpcd_add_req(req);
631 }
632
633
634 /**
635  * Allocate and prepare RPC for a new change.
636  *
637  * The function allocates and initializes an RPC which will be sent soon to
638  * apply the change to the target OST. The request is initialized from the
639  * llog record passed. Notice only the fields common to all type of changes
640  * are initialized.
641  *
642  * \param[in] d         OSP device
643  * \param[in] llh       llog handle where the record is stored
644  * \param[in] h         llog record
645  * \param[in] op        type of the change
646  * \param[in] format    request format to be used
647  *
648  * \retval pointer              new request on success
649  * \retval ERR_PTR(errno)       on error
650  */
651 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
652                                                struct llog_handle *llh,
653                                                struct llog_rec_hdr *h,
654                                                ost_cmd_t op,
655                                                const struct req_format *format)
656 {
657         struct ptlrpc_request   *req;
658         struct ost_body         *body;
659         struct obd_import       *imp;
660         int                      rc;
661
662         /* Prepare the request */
663         imp = d->opd_obd->u.cli.cl_import;
664         LASSERT(imp);
665
666         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
667                 RETURN(ERR_PTR(-ENOMEM));
668
669         req = ptlrpc_request_alloc(imp, format);
670         if (req == NULL)
671                 RETURN(ERR_PTR(-ENOMEM));
672
673         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
674         if (rc) {
675                 ptlrpc_req_finished(req);
676                 return ERR_PTR(rc);
677         }
678
679         /*
680          * this is a trick: to save on memory allocations we put cookie
681          * into the request, but don't set corresponded flag in o_valid
682          * so that OST doesn't interpret this cookie. once the request
683          * is committed on OST we take cookie from the request and cancel
684          */
685         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
686         LASSERT(body);
687         body->oa.o_lcookie.lgc_lgl = llh->lgh_id;
688         body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
689         body->oa.o_lcookie.lgc_index = h->lrh_index;
690
691         req->rq_interpret_reply = osp_sync_interpret;
692         req->rq_commit_cb = osp_sync_request_commit_cb;
693         req->rq_cb_data = d;
694
695         ptlrpc_request_set_replen(req);
696
697         return req;
698 }
699
700 /**
701  * Generate a request for setattr change.
702  *
703  * The function prepares a new RPC, initializes it with setattr specific
704  * bits and send the RPC.
705  *
706  * \param[in] d         OSP device
707  * \param[in] llh       llog handle where the record is stored
708  * \param[in] h         llog record
709  *
710  * \retval 0            on success
711  * \retval 1            on invalid record
712  * \retval negative     negated errno on error
713  */
714 static int osp_sync_new_setattr_job(struct osp_device *d,
715                                     struct llog_handle *llh,
716                                     struct llog_rec_hdr *h)
717 {
718         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
719         struct ptlrpc_request           *req;
720         struct ost_body                 *body;
721
722         ENTRY;
723         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
724
725         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
726                 RETURN(1);
727         /* lsr_valid can only be 0 or have OBD_MD_{FLUID,FLGID} set,
728          * so no bits other than these should be set. */
729         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID)) != 0) {
730                 CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
731                        d->opd_obd->obd_name, rec->lsr_valid);
732                 /* return 1 on invalid record */
733                 RETURN(1);
734         }
735
736         req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR);
737         if (IS_ERR(req))
738                 RETURN(PTR_ERR(req));
739
740         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
741         LASSERT(body);
742         body->oa.o_oi = rec->lsr_oi;
743         body->oa.o_uid = rec->lsr_uid;
744         body->oa.o_gid = rec->lsr_gid;
745         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
746         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
747          * we assume that both UID and GID are valid in that case. */
748         if (rec->lsr_valid == 0)
749                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
750         else
751                 body->oa.o_valid |= rec->lsr_valid;
752
753         osp_sync_send_new_rpc(d, req);
754         RETURN(0);
755 }
756
757 /**
758  * Generate a request for unlink change.
759  *
760  * The function prepares a new RPC, initializes it with unlink(destroy)
761  * specific bits and sends the RPC. The function is used to handle
762  * llog_unlink_rec which were used in the older versions of Lustre.
763  * Current version uses llog_unlink_rec64.
764  *
765  * \param[in] d         OSP device
766  * \param[in] llh       llog handle where the record is stored
767  * \param[in] h         llog record
768  *
769  * \retval 0            on success
770  * \retval negative     negated errno on error
771  */
772 static int osp_sync_new_unlink_job(struct osp_device *d,
773                                    struct llog_handle *llh,
774                                    struct llog_rec_hdr *h)
775 {
776         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
777         struct ptlrpc_request   *req;
778         struct ost_body         *body;
779
780         ENTRY;
781         LASSERT(h->lrh_type == MDS_UNLINK_REC);
782
783         req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY);
784         if (IS_ERR(req))
785                 RETURN(PTR_ERR(req));
786
787         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
788         LASSERT(body);
789         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
790         ostid_set_id(&body->oa.o_oi, rec->lur_oid);
791         body->oa.o_misc = rec->lur_count;
792         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
793         if (rec->lur_count)
794                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
795
796         osp_sync_send_new_rpc(d, req);
797         RETURN(0);
798 }
799
800 /**
801  * Generate a request for unlink change.
802  *
803  * The function prepares a new RPC, initializes it with unlink(destroy)
804  * specific bits and sends the RPC. Depending on the target (MDT or OST)
805  * two different protocols are used. For MDT we use OUT (basically OSD API
806  * updates transferred via a network). For OST we still use the old
807  * protocol (OBD?), originally for compatibility. Later we can start to
808  * use OUT for OST as well, this will allow batching and better code
809  * unification.
810  *
811  * \param[in] d         OSP device
812  * \param[in] llh       llog handle where the record is stored
813  * \param[in] h         llog record
814  *
815  * \retval 0            on success
816  * \retval negative     negated errno on error
817  */
818 static int osp_sync_new_unlink64_job(struct osp_device *d,
819                                      struct llog_handle *llh,
820                                      struct llog_rec_hdr *h)
821 {
822         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
823         struct ptlrpc_request           *req = NULL;
824         struct ost_body                 *body;
825         int                              rc;
826
827         ENTRY;
828         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
829         req = osp_sync_new_job(d, llh, h, OST_DESTROY,
830                                &RQF_OST_DESTROY);
831         if (IS_ERR(req))
832                 RETURN(PTR_ERR(req));
833
834         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
835         if (body == NULL)
836                 RETURN(-EFAULT);
837         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
838         if (rc < 0)
839                 RETURN(rc);
840         body->oa.o_misc = rec->lur_count;
841         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
842                            OBD_MD_FLOBJCOUNT;
843         osp_sync_send_new_rpc(d, req);
844         RETURN(0);
845 }
846
847 /**
848  * Process llog records.
849  *
850  * This function is called to process the llog records committed locally.
851  * In the recovery model used by OSP we can apply a change to a remote
852  * target once corresponding transaction (like posix unlink) is committed
853  * locally so can't revert.
854  * Depending on the llog record type, a given handler is called that is
855  * responsible for preparing and sending the RPC to apply the change.
856  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
857  *
858  * \param[in] env       LU environment provided by the caller
859  * \param[in] d         OSP device
860  * \param[in] llh       llog handle where the record is stored
861  * \param[in] rec       llog record
862  */
863 static void osp_sync_process_record(const struct lu_env *env,
864                                     struct osp_device *d,
865                                     struct llog_handle *llh,
866                                     struct llog_rec_hdr *rec)
867 {
868         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
869         struct llog_cookie       cookie;
870         int                      rc = 0;
871
872         ENTRY;
873
874         cookie.lgc_lgl = llh->lgh_id;
875         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
876         cookie.lgc_index = rec->lrh_index;
877
878         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
879                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
880
881                 /* we're waiting for the record generated by this instance */
882                 LASSERT(d->opd_syn_prev_done == 0);
883                 if (!memcmp(&d->opd_syn_generation, &gen->lgr_gen,
884                             sizeof(gen->lgr_gen))) {
885                         CDEBUG(D_HA, "processed all old entries\n");
886                         d->opd_syn_prev_done = 1;
887                 }
888
889                 /* cancel any generation record */
890                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
891
892                 RETURN_EXIT;
893         }
894
895         /*
896          * now we prepare and fill requests to OST, put them on the queue
897          * and fire after next commit callback
898          */
899
900         /* notice we increment counters before sending RPC, to be consistent
901          * in RPC interpret callback which may happen very quickly */
902         atomic_inc(&d->opd_syn_rpc_in_flight);
903         atomic_inc(&d->opd_syn_rpc_in_progress);
904
905         switch (rec->lrh_type) {
906         /* case MDS_UNLINK_REC is kept for compatibility */
907         case MDS_UNLINK_REC:
908                 rc = osp_sync_new_unlink_job(d, llh, rec);
909                 break;
910         case MDS_UNLINK64_REC:
911                 rc = osp_sync_new_unlink64_job(d, llh, rec);
912                 break;
913         case MDS_SETATTR64_REC:
914                 rc = osp_sync_new_setattr_job(d, llh, rec);
915                 break;
916         default:
917                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
918                        rec->lrh_type);
919                 /* treat "unknown record type" as "invalid" */
920                 rc = 1;
921                 break;
922         }
923
924         /* For all kinds of records, not matter successful or not,
925          * we should decrease changes and bump last_processed_id.
926          */
927         if (d->opd_syn_prev_done) {
928                 __u64 correct_id = osp_sync_correct_id(d, rec);
929                 LASSERT(atomic_read(&d->opd_syn_changes) > 0);
930                 LASSERT(correct_id <= d->opd_syn_last_committed_id);
931                 /* NOTE: it's possible to meet same id if
932                  * OST stores few stripes of same file
933                  */
934                 while (1) {
935                         /* another thread may be trying to set new value */
936                         rmb();
937                         if (correct_id > d->opd_syn_last_processed_id) {
938                                 d->opd_syn_last_processed_id = correct_id;
939                                 wake_up(&d->opd_syn_barrier_waitq);
940                         } else
941                                 break;
942                 }
943                 atomic_dec(&d->opd_syn_changes);
944         }
945         if (rc != 0) {
946                 atomic_dec(&d->opd_syn_rpc_in_flight);
947                 atomic_dec(&d->opd_syn_rpc_in_progress);
948         }
949
950         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
951                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
952                atomic_read(&d->opd_syn_rpc_in_progress));
953
954         /* Delete the invalid record */
955         if (rc == 1) {
956                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
957                 if (rc != 0)
958                         CERROR("%s: can't delete invalid record: "
959                                "fid = "DFID", rec_id = %u, rc = %d\n",
960                                d->opd_obd->obd_name,
961                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
962                                rec->lrh_id, rc);
963         }
964
965         CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n",
966                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
967
968         RETURN_EXIT;
969 }
970
971 /**
972  * Cancel llog records for the committed changes.
973  *
974  * The function walks through the list of the committed RPCs and cancels
975  * corresponding llog records. see osp_sync_request_commit_cb() for the
976  * details.
977  *
978  * \param[in] env       LU environment provided by the caller
979  * \param[in] d         OSP device
980  */
981 static void osp_sync_process_committed(const struct lu_env *env,
982                                        struct osp_device *d)
983 {
984         struct obd_device       *obd = d->opd_obd;
985         struct obd_import       *imp = obd->u.cli.cl_import;
986         struct ost_body         *body;
987         struct ptlrpc_request   *req;
988         struct llog_ctxt        *ctxt;
989         struct llog_handle      *llh;
990         struct list_head         list;
991         int                      rc, done = 0;
992
993         ENTRY;
994
995         if (list_empty(&d->opd_syn_committed_there))
996                 return;
997
998         /*
999          * if current status is -ENOSPC (lack of free space on OST)
1000          * then we should poll OST immediately once object destroy
1001          * is committed.
1002          * notice: we do this upon commit as well because some backends
1003          * (like DMU) do not release space right away.
1004          */
1005         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
1006                 osp_statfs_need_now(d);
1007
1008         /*
1009          * now cancel them all
1010          * XXX: can we improve this using some batching?
1011          *      with batch RPC that'll happen automatically?
1012          * XXX: can we store ctxt in lod_device and save few cycles ?
1013          */
1014         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1015         LASSERT(ctxt);
1016
1017         llh = ctxt->loc_handle;
1018         LASSERT(llh);
1019
1020         INIT_LIST_HEAD(&list);
1021         spin_lock(&d->opd_syn_lock);
1022         list_splice(&d->opd_syn_committed_there, &list);
1023         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1024         spin_unlock(&d->opd_syn_lock);
1025
1026         while (!list_empty(&list)) {
1027                 struct osp_job_req_args *jra;
1028
1029                 jra = list_entry(list.next, struct osp_job_req_args,
1030                                  jra_committed_link);
1031                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1032                 list_del_init(&jra->jra_committed_link);
1033
1034                 req = container_of((void *)jra, struct ptlrpc_request,
1035                                    rq_async_args);
1036                 body = req_capsule_client_get(&req->rq_pill,
1037                                               &RMF_OST_BODY);
1038                 LASSERT(body);
1039                 /* import can be closing, thus all commit cb's are
1040                  * called we can check committness directly */
1041                 if (req->rq_import_generation == imp->imp_generation) {
1042                         rc = llog_cat_cancel_records(env, llh, 1,
1043                                                      &body->oa.o_lcookie);
1044                         if (rc)
1045                                 CERROR("%s: can't cancel record: %d\n",
1046                                        obd->obd_name, rc);
1047                 } else {
1048                         DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
1049                                   imp->imp_peer_committed_transno);
1050                 }
1051                 ptlrpc_req_finished(req);
1052                 done++;
1053         }
1054
1055         llog_ctxt_put(ctxt);
1056
1057         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
1058         atomic_sub(done, &d->opd_syn_rpc_in_progress);
1059         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
1060                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
1061                atomic_read(&d->opd_syn_rpc_in_progress));
1062
1063         osp_sync_check_for_work(d);
1064
1065         /* wake up the thread if requested to stop:
1066          * it might be waiting for in-progress to complete */
1067         if (unlikely(osp_sync_running(d) == 0))
1068                 wake_up(&d->opd_syn_waitq);
1069
1070         EXIT;
1071 }
1072
1073 /**
1074  * The core of the syncing mechanism.
1075  *
1076  * This is a callback called by the llog processing function. Essentially it
1077  * suspends llog processing until there is a record to process (it's supposed
1078  * to be committed locally). The function handles RPCs committed by the target
1079  * and cancels corresponding llog records.
1080  *
1081  * \param[in] env       LU environment provided by the caller
1082  * \param[in] llh       llog handle we're processing
1083  * \param[in] rec       current llog record
1084  * \param[in] data      callback data containing a pointer to the device
1085  *
1086  * \retval 0                    to ask the caller (llog_process()) to continue
1087  * \retval LLOG_PROC_BREAK      to ask the caller to break
1088  */
1089 static int osp_sync_process_queues(const struct lu_env *env,
1090                                    struct llog_handle *llh,
1091                                    struct llog_rec_hdr *rec,
1092                                    void *data)
1093 {
1094         struct osp_device       *d = data;
1095
1096         do {
1097                 struct l_wait_info lwi = { 0 };
1098
1099                 if (!osp_sync_running(d)) {
1100                         CDEBUG(D_HA, "stop llog processing\n");
1101                         return LLOG_PROC_BREAK;
1102                 }
1103
1104                 /* process requests committed by OST */
1105                 osp_sync_process_committed(env, d);
1106
1107                 /* if we there are changes to be processed and we have
1108                  * resources for this ... do now */
1109                 if (osp_sync_can_process_new(d, rec)) {
1110                         if (llh == NULL) {
1111                                 /* ask llog for another record */
1112                                 CDEBUG(D_HA, "%u changes, %u in progress,"
1113                                        " %u in flight\n",
1114                                        atomic_read(&d->opd_syn_changes),
1115                                        atomic_read(&d->opd_syn_rpc_in_progress),
1116                                        atomic_read(&d->opd_syn_rpc_in_flight));
1117                                 return 0;
1118                         }
1119                         osp_sync_process_record(env, d, llh, rec);
1120                         llh = NULL;
1121                         rec = NULL;
1122                 }
1123
1124                 if (d->opd_syn_last_processed_id == d->opd_syn_last_used_id)
1125                         osp_sync_remove_from_tracker(d);
1126
1127                 l_wait_event(d->opd_syn_waitq,
1128                              !osp_sync_running(d) ||
1129                              osp_sync_can_process_new(d, rec) ||
1130                              !list_empty(&d->opd_syn_committed_there),
1131                              &lwi);
1132         } while (1);
1133 }
1134
1135 /**
1136  * OSP sync thread.
1137  *
1138  * This thread runs llog_cat_process() scanner calling our callback
1139  * to process llog records. in the callback we implement tricky
1140  * state machine as we don't want to start scanning of the llog again
1141  * and again, also we don't want to process too many records and send
1142  * too many RPCs a time. so, depending on current load (num of changes
1143  * being synced to OST) the callback can suspend awaiting for some
1144  * new conditions, like syncs completed.
1145  *
1146  * In order to process llog records left by previous boots and to allow
1147  * llog_process_thread() to find something (otherwise it'd just exit
1148  * immediately) we add a special GENERATATION record on each boot.
1149  *
1150  * \param[in] _arg      a pointer to thread's arguments
1151  *
1152  * \retval 0            on success
1153  * \retval negative     negated errno on error
1154  */
1155 static int osp_sync_thread(void *_arg)
1156 {
1157         struct osp_device       *d = _arg;
1158         struct ptlrpc_thread    *thread = &d->opd_syn_thread;
1159         struct l_wait_info       lwi = { 0 };
1160         struct llog_ctxt        *ctxt;
1161         struct obd_device       *obd = d->opd_obd;
1162         struct llog_handle      *llh;
1163         struct lu_env            env;
1164         int                      rc, count;
1165
1166         ENTRY;
1167
1168         rc = lu_env_init(&env, LCT_LOCAL);
1169         if (rc) {
1170                 CERROR("%s: can't initialize env: rc = %d\n",
1171                        obd->obd_name, rc);
1172
1173                 spin_lock(&d->opd_syn_lock);
1174                 thread->t_flags = SVC_STOPPED;
1175                 spin_unlock(&d->opd_syn_lock);
1176                 wake_up(&thread->t_ctl_waitq);
1177
1178                 RETURN(rc);
1179         }
1180
1181         spin_lock(&d->opd_syn_lock);
1182         thread->t_flags = SVC_RUNNING;
1183         spin_unlock(&d->opd_syn_lock);
1184         wake_up(&thread->t_ctl_waitq);
1185
1186         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1187         if (ctxt == NULL) {
1188                 CERROR("can't get appropriate context\n");
1189                 GOTO(out, rc = -EINVAL);
1190         }
1191
1192         llh = ctxt->loc_handle;
1193         if (llh == NULL) {
1194                 CERROR("can't get llh\n");
1195                 llog_ctxt_put(ctxt);
1196                 GOTO(out, rc = -EINVAL);
1197         }
1198
1199         rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
1200         if (rc < 0) {
1201                 CERROR("%s: llog process with osp_sync_process_queues "
1202                        "failed: %d\n", d->opd_obd->obd_name, rc);
1203                 GOTO(close, rc);
1204         }
1205         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1206                  "%u changes, %u in progress, %u in flight: %d\n",
1207                  atomic_read(&d->opd_syn_changes),
1208                  atomic_read(&d->opd_syn_rpc_in_progress),
1209                  atomic_read(&d->opd_syn_rpc_in_flight), rc);
1210
1211         /* we don't expect llog_process_thread() to exit till umount */
1212         LASSERTF(thread->t_flags != SVC_RUNNING,
1213                  "%u changes, %u in progress, %u in flight\n",
1214                  atomic_read(&d->opd_syn_changes),
1215                  atomic_read(&d->opd_syn_rpc_in_progress),
1216                  atomic_read(&d->opd_syn_rpc_in_flight));
1217
1218         /* wait till all the requests are completed */
1219         count = 0;
1220         while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
1221                 osp_sync_process_committed(&env, d);
1222
1223                 lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
1224                 rc = l_wait_event(d->opd_syn_waitq,
1225                                   atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1226                                   &lwi);
1227                 if (rc == -ETIMEDOUT)
1228                         count++;
1229                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1230                          d->opd_obd->obd_name,
1231                          atomic_read(&d->opd_syn_rpc_in_progress),
1232                          atomic_read(&d->opd_syn_rpc_in_flight),
1233                          list_empty(&d->opd_syn_committed_there) ? "" : "!");
1234
1235         }
1236
1237 close:
1238         llog_cat_close(&env, llh);
1239         rc = llog_cleanup(&env, ctxt);
1240         if (rc)
1241                 CERROR("can't cleanup llog: %d\n", rc);
1242 out:
1243         LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1244                  "%s: %d %d %sempty\n",
1245                  d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
1246                  atomic_read(&d->opd_syn_rpc_in_flight),
1247                  list_empty(&d->opd_syn_committed_there) ? "" : "!");
1248
1249         thread->t_flags = SVC_STOPPED;
1250
1251         wake_up(&thread->t_ctl_waitq);
1252
1253         lu_env_fini(&env);
1254
1255         RETURN(0);
1256 }
1257
1258 /**
1259  * Initialize llog.
1260  *
1261  * Initializes the llog. Specific llog to be used depends on the type of the
1262  * target OSP represents (OST or MDT). The function adds appends a new llog
1263  * record to mark the place where the records associated with this boot
1264  * start.
1265  *
1266  * \param[in] env       LU environment provided by the caller
1267  * \param[in] d         OSP device
1268  *
1269  * \retval 0            on success
1270  * \retval negative     negated errno on error
1271  */
1272 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1273 {
1274         struct osp_thread_info  *osi = osp_env_info(env);
1275         struct lu_fid           *fid = &osi->osi_fid;
1276         struct llog_handle      *lgh = NULL;
1277         struct obd_device       *obd = d->opd_obd;
1278         struct llog_ctxt        *ctxt;
1279         int                     rc;
1280
1281         ENTRY;
1282
1283         LASSERT(obd);
1284
1285         /*
1286          * open llog corresponding to our OST
1287          */
1288         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1289         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1290
1291         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1292
1293         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1294                                    &osi->osi_cid, fid);
1295         if (rc < 0) {
1296                 if (rc != -EFAULT) {
1297                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1298                                obd->obd_name, rc);
1299                         RETURN(rc);
1300                 }
1301
1302                 /* After sparse OST indices is supported, the CATALOG file
1303                  * may become a sparse file that results in failure on
1304                  * reading. Skip this error as the llog will be created
1305                  * later */
1306                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1307                 rc = 0;
1308         }
1309
1310         CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
1311                obd->obd_name, d->opd_index,
1312                POSTID(&osi->osi_cid.lci_logid.lgl_oi),
1313                osi->osi_cid.lci_logid.lgl_ogen);
1314
1315         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1316                         d->opd_storage->dd_lu_dev.ld_obd,
1317                         &osp_mds_ost_orig_logops);
1318         if (rc)
1319                 RETURN(rc);
1320
1321         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1322         LASSERT(ctxt);
1323
1324         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1325                 rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
1326                                LLOG_OPEN_EXISTS);
1327                 /* re-create llog if it is missing */
1328                 if (rc == -ENOENT)
1329                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1330                 else if (rc < 0)
1331                         GOTO(out_cleanup, rc);
1332         }
1333
1334         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1335                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1336                 if (rc < 0)
1337                         GOTO(out_cleanup, rc);
1338                 osi->osi_cid.lci_logid = lgh->lgh_id;
1339         }
1340
1341         LASSERT(lgh != NULL);
1342         ctxt->loc_handle = lgh;
1343
1344         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
1345         if (rc)
1346                 GOTO(out_close, rc);
1347
1348         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1349                                    &osi->osi_cid, fid);
1350         if (rc)
1351                 GOTO(out_close, rc);
1352
1353         /*
1354          * put a mark in the llog till which we'll be processing
1355          * old records restless
1356          */
1357         d->opd_syn_generation.mnt_cnt = cfs_time_current();
1358         d->opd_syn_generation.conn_cnt = cfs_time_current();
1359
1360         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1361         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1362
1363         memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation,
1364                sizeof(osi->osi_gen.lgr_gen));
1365
1366         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1367         if (rc < 0)
1368                 GOTO(out_close, rc);
1369         llog_ctxt_put(ctxt);
1370         RETURN(0);
1371 out_close:
1372         llog_cat_close(env, lgh);
1373 out_cleanup:
1374         llog_cleanup(env, ctxt);
1375         RETURN(rc);
1376 }
1377
1378 /**
1379  * Cleanup llog used for syncing.
1380  *
1381  * Closes and cleanups the llog. The function is called when the device is
1382  * shutting down.
1383  *
1384  * \param[in] env       LU environment provided by the caller
1385  * \param[in] d         OSP device
1386  */
1387 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1388 {
1389         struct llog_ctxt *ctxt;
1390
1391         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1392         if (ctxt) {
1393                 llog_cat_close(env, ctxt->loc_handle);
1394                 llog_cleanup(env, ctxt);
1395         }
1396 }
1397
1398 /**
1399  * Initialization of the sync component of OSP.
1400  *
1401  * Initializes the llog and starts a new thread to handle the changes to
1402  * the remote target (OST or MDT).
1403  *
1404  * \param[in] env       LU environment provided by the caller
1405  * \param[in] d         OSP device
1406  *
1407  * \retval 0            on success
1408  * \retval negative     negated errno on error
1409  */
1410 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1411 {
1412         struct l_wait_info       lwi = { 0 };
1413         struct task_struct      *task;
1414         int                      rc;
1415
1416         ENTRY;
1417
1418         d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
1419         d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
1420         spin_lock_init(&d->opd_syn_lock);
1421         init_waitqueue_head(&d->opd_syn_waitq);
1422         init_waitqueue_head(&d->opd_syn_barrier_waitq);
1423         thread_set_flags(&d->opd_syn_thread, SVC_INIT);
1424         init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
1425         INIT_LIST_HEAD(&d->opd_syn_inflight_list);
1426         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1427
1428         if (d->opd_storage->dd_rdonly)
1429                 RETURN(0);
1430
1431         rc = osp_sync_id_traction_init(d);
1432         if (rc)
1433                 RETURN(rc);
1434
1435         /*
1436          * initialize llog storing changes
1437          */
1438         rc = osp_sync_llog_init(env, d);
1439         if (rc) {
1440                 CERROR("%s: can't initialize llog: rc = %d\n",
1441                        d->opd_obd->obd_name, rc);
1442                 GOTO(err_id, rc);
1443         }
1444
1445         /*
1446          * Start synchronization thread
1447          */
1448         task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
1449                            d->opd_index, d->opd_group);
1450         if (IS_ERR(task)) {
1451                 rc = PTR_ERR(task);
1452                 CERROR("%s: cannot start sync thread: rc = %d\n",
1453                        d->opd_obd->obd_name, rc);
1454                 GOTO(err_llog, rc);
1455         }
1456
1457         l_wait_event(d->opd_syn_thread.t_ctl_waitq,
1458                      osp_sync_running(d) || osp_sync_stopped(d), &lwi);
1459
1460         RETURN(0);
1461 err_llog:
1462         osp_sync_llog_fini(env, d);
1463 err_id:
1464         osp_sync_id_traction_fini(d);
1465         return rc;
1466 }
1467
1468 /**
1469  * Stop the syncing thread.
1470  *
1471  * Asks the syncing thread to stop and wait until it's stopped.
1472  *
1473  * \param[in] d         OSP device
1474  *
1475  * \retval              0
1476  */
1477 int osp_sync_fini(struct osp_device *d)
1478 {
1479         struct ptlrpc_thread *thread = &d->opd_syn_thread;
1480
1481         ENTRY;
1482
1483         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1484                 thread->t_flags = SVC_STOPPING;
1485                 wake_up(&d->opd_syn_waitq);
1486                 wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
1487         }
1488
1489         /*
1490          * unregister transaction callbacks only when sync thread
1491          * has finished operations with llog
1492          */
1493         osp_sync_id_traction_fini(d);
1494
1495         RETURN(0);
1496 }
1497
1498 static DEFINE_MUTEX(osp_id_tracker_sem);
1499 static struct list_head osp_id_tracker_list =
1500                 LIST_HEAD_INIT(osp_id_tracker_list);
1501
1502 /**
1503  * OSD commit callback.
1504  *
1505  * The function is used as a local OSD commit callback to track the highest
1506  * committed llog record id. see osp_sync_id_traction_init() for the details.
1507  *
1508  * \param[in] th        local transaction handle committed
1509  * \param[in] cookie    commit callback data (our private structure)
1510  */
1511 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
1512 {
1513         struct osp_id_tracker   *tr = cookie;
1514         struct osp_device       *d;
1515         struct osp_txn_info     *txn;
1516
1517         LASSERT(tr);
1518
1519         txn = osp_txn_info(&th->th_ctx);
1520         if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
1521                 return;
1522
1523         spin_lock(&tr->otr_lock);
1524         if (likely(txn->oti_current_id > tr->otr_committed_id)) {
1525                 CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
1526                        tr->otr_committed_id, txn->oti_current_id);
1527                 tr->otr_committed_id = txn->oti_current_id;
1528
1529                 list_for_each_entry(d, &tr->otr_wakeup_list,
1530                                     opd_syn_ontrack) {
1531                         d->opd_syn_last_committed_id = tr->otr_committed_id;
1532                         wake_up(&d->opd_syn_waitq);
1533                 }
1534         }
1535         spin_unlock(&tr->otr_lock);
1536 }
1537
1538 /**
1539  * Initialize commit tracking mechanism.
1540  *
1541  * Some setups may have thousands of OSTs and each will be represented by OSP.
1542  * Meaning order of magnitute many more changes to apply every second. In order
1543  * to keep the number of commit callbacks low this mechanism was introduced.
1544  * The mechanism is very similar to transno used by MDT service: it's an single
1545  * ID stream which can be assigned by any OSP to its llog records. The tricky
1546  * part is that ID is stored in per-transaction data and re-used by all the OSPs
1547  * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
1548  *
1549  * The function initializes the data used by the tracker described above.
1550  * A singler tracker per OSD device is created.
1551  *
1552  * \param[in] d         OSP device
1553  *
1554  * \retval 0            on success
1555  * \retval negative     negated errno on error
1556  */
1557 static int osp_sync_id_traction_init(struct osp_device *d)
1558 {
1559         struct osp_id_tracker   *tr, *found = NULL;
1560         int                      rc = 0;
1561
1562         LASSERT(d);
1563         LASSERT(d->opd_storage);
1564         LASSERT(d->opd_syn_tracker == NULL);
1565         INIT_LIST_HEAD(&d->opd_syn_ontrack);
1566
1567         mutex_lock(&osp_id_tracker_sem);
1568         list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
1569                 if (tr->otr_dev == d->opd_storage) {
1570                         LASSERT(atomic_read(&tr->otr_refcount));
1571                         atomic_inc(&tr->otr_refcount);
1572                         d->opd_syn_tracker = tr;
1573                         found = tr;
1574                         break;
1575                 }
1576         }
1577
1578         if (found == NULL) {
1579                 rc = -ENOMEM;
1580                 OBD_ALLOC_PTR(tr);
1581                 if (tr) {
1582                         d->opd_syn_tracker = tr;
1583                         spin_lock_init(&tr->otr_lock);
1584                         tr->otr_dev = d->opd_storage;
1585                         tr->otr_next_id = 1;
1586                         tr->otr_committed_id = 0;
1587                         atomic_set(&tr->otr_refcount, 1);
1588                         INIT_LIST_HEAD(&tr->otr_wakeup_list);
1589                         list_add(&tr->otr_list, &osp_id_tracker_list);
1590                         tr->otr_tx_cb.dtc_txn_commit =
1591                                                 osp_sync_tracker_commit_cb;
1592                         tr->otr_tx_cb.dtc_cookie = tr;
1593                         tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
1594                         dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
1595                         rc = 0;
1596                 }
1597         }
1598         mutex_unlock(&osp_id_tracker_sem);
1599
1600         return rc;
1601 }
1602
1603 /**
1604  * Release commit tracker.
1605  *
1606  * Decrease a refcounter on the tracker used by the given OSP device \a d.
1607  * If no more users left, then the tracker is released.
1608  *
1609  * \param[in] d         OSP device
1610  */
1611 static void osp_sync_id_traction_fini(struct osp_device *d)
1612 {
1613         struct osp_id_tracker *tr;
1614
1615         ENTRY;
1616
1617         LASSERT(d);
1618         tr = d->opd_syn_tracker;
1619         if (tr == NULL) {
1620                 EXIT;
1621                 return;
1622         }
1623
1624         osp_sync_remove_from_tracker(d);
1625
1626         mutex_lock(&osp_id_tracker_sem);
1627         if (atomic_dec_and_test(&tr->otr_refcount)) {
1628                 dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
1629                 LASSERT(list_empty(&tr->otr_wakeup_list));
1630                 list_del(&tr->otr_list);
1631                 OBD_FREE_PTR(tr);
1632                 d->opd_syn_tracker = NULL;
1633         }
1634         mutex_unlock(&osp_id_tracker_sem);
1635
1636         EXIT;
1637 }
1638
1639 /**
1640  * Generate a new ID on a tracker.
1641  *
1642  * Generates a new ID using the tracker associated with the given OSP device
1643  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
1644  * the wakeup list, so OSP won't miss when a transaction using the ID is
1645  * committed.
1646  *
1647  * \param[in] d         OSP device
1648  * \param[in] id        0 or ID generated previously
1649  *
1650  * \retval              ID the caller should use
1651  */
1652 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
1653 {
1654         struct osp_id_tracker *tr;
1655
1656         tr = d->opd_syn_tracker;
1657         LASSERT(tr);
1658
1659         /* XXX: we can improve this introducing per-cpu preallocated ids? */
1660         spin_lock(&tr->otr_lock);
1661         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
1662                 tr->otr_next_id = 0xfffffff0;
1663
1664         if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
1665                 spin_unlock(&tr->otr_lock);
1666                 CERROR("%s: next %llu, last synced %llu\n",
1667                        d->opd_obd->obd_name, tr->otr_next_id,
1668                        d->opd_syn_last_used_id);
1669                 LBUG();
1670         }
1671
1672         if (id == 0)
1673                 id = tr->otr_next_id++;
1674         if (id > d->opd_syn_last_used_id)
1675                 d->opd_syn_last_used_id = id;
1676         if (list_empty(&d->opd_syn_ontrack))
1677                 list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
1678         spin_unlock(&tr->otr_lock);
1679         CDEBUG(D_OTHER, "new id %llu\n", id);
1680
1681         return id;
1682 }
1683
1684 /**
1685  * Stop to propagate commit status to OSP.
1686  *
1687  * If the OSP does not have any llog records she's waiting to commit, then
1688  * it is possible to unsubscribe from wakeups from the tracking using this
1689  * method.
1690  *
1691  * \param[in] d         OSP device not willing to wakeup
1692  */
1693 static void osp_sync_remove_from_tracker(struct osp_device *d)
1694 {
1695         struct osp_id_tracker *tr;
1696
1697         tr = d->opd_syn_tracker;
1698         LASSERT(tr);
1699
1700         if (list_empty(&d->opd_syn_ontrack))
1701                 return;
1702
1703         spin_lock(&tr->otr_lock);
1704         list_del_init(&d->opd_syn_ontrack);
1705         spin_unlock(&tr->otr_lock);
1706 }
1707