Fifo Linux non bloccante (registrazione su richiesta)

Questo è un thread (molto) vecchio, ma ultimamente mi sono imbattuto in un problema simile. In effetti, ciò di cui avevo bisogno è una clonazione di stdin in stdout con una copia in una pipe che non blocchi. il ftee proposto nella prima risposta mi ha davvero aiutato, ma era (per il mio caso d'uso) troppo volatile. Significa che ho perso i dati che avrei potuto elaborare se fossi arrivato in tempo.

Lo scenario che mi sono trovato di fronte è che ho un processo (some_process) che aggrega alcuni dati e scrive i suoi risultati ogni tre secondi su stdout. La configurazione (semplificata) era simile a questa (nella configurazione reale sto usando una named pipe):

some_process | ftee >( > results) | gzip > raw_data.gz

Ora, raw_data.gz deve essere compresso e deve essere completo. ftee fa questo lavoro molto bene. Ma il tubo che sto usando nel mezzo era troppo lento per raccogliere i dati scaricati, ma era abbastanza veloce da elaborare tutto se poteva arrivarci, che è stato testato con un normale tee. Tuttavia, un normale tee blocca se succede qualcosa al tubo senza nome, e poiché voglio essere in grado di collegarmi su richiesta, il tee non è un'opzione. Tornando all'argomento:è migliorato quando ho inserito un buffer in mezzo, risultando in:

some_process | ftee >(mbuffer -m 32M| > results) | gzip > raw_data.gz

Ma stava ancora perdendo dati che avrei potuto elaborare. Quindi sono andato avanti ed ho esteso il ftee proposto prima a una versione bufferizzata (bftee). Ha ancora tutte le stesse proprietà, ma utilizza un buffer interno (inefficiente?) Nel caso in cui una scrittura fallisca. Perde comunque i dati se il buffer è pieno, ma funziona magnificamente per il mio caso. Come sempre, c'è molto margine di miglioramento, ma poiché ho copiato il codice da qui, vorrei condividerlo con le persone che potrebbero esserne utili.

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe 
    (c) [email protected]
    (c) [email protected]
    WTFPL Licence */

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <signal.h>
    #include <unistd.h>

    // the number of sBuffers that are being held at a maximum
    #define BUFFER_SIZE 4096
    #define BLOCK_SIZE 2048

    typedef struct {
      char data[BLOCK_SIZE];
      int bytes;
    } sBuffer;

    typedef struct {
      sBuffer *data;  //array of buffers
      int bufferSize; // number of buffer in data
      int start;      // index of the current start buffer
      int end;        // index of the current end buffer
      int active;     // number of active buffer (currently in use)
      int maxUse;     // maximum number of buffers ever used
      int drops;      // number of discarded buffer due to overflow
      int sWrites;    // number of buffer written to stdout
      int pWrites;    // number of buffers written to pipe
    } sQueue;

    void InitQueue(sQueue*, int);              // initialized the Queue
    void PushToQueue(sQueue*, sBuffer*, int);  // pushes a buffer into Queue at the end 
    sBuffer *RetrieveFromQueue(sQueue*);       // returns the first entry of the buffer and removes it or NULL is buffer is empty
    sBuffer *PeakAtQueue(sQueue*);             // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
    void ShrinkInQueue(sQueue *queue, int);    // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
    void DelFromQueue(sQueue *queue);          // removes the first entry of the queue

    static void sigUSR1(int);                  // signal handled for SUGUSR1 - used for stats output to stderr
    static void sigINT(int);                   // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?

    sQueue queue;                              // Buffer storing the overflow
    volatile int quit;                         // for quiting the main loop

    int main(int argc, char *argv[])
        int readfd, writefd;
        struct stat status;
        char *fifonam;
        sBuffer buffer;
        ssize_t bytes;
        int bufferSize = BUFFER_SIZE;

        signal(SIGPIPE, SIG_IGN);
        signal(SIGUSR1, sigUSR1);
        signal(SIGTERM, sigINT);
        signal(SIGINT,  sigINT);

        /** Handle commandline args and open the pipe for non blocking writing **/

        if(argc < 2 || argc > 3)
            printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
                   "FIFO - path to a named pipe, required argument\n"
                   "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);

        fifonam = argv[1];
        if (argc == 3) {
          bufferSize = atoi(argv[2]);
          if (bufferSize == 0) bufferSize = BUFFER_SIZE;

        readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
            perror("bftee: readfd: open()");

        if(-1==fstat(readfd, &status))
            perror("bftee: fstat");

            printf("bftee: %s in not a fifo!\n", fifonam);

        writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
            perror("bftee: writefd: open()");


        InitQueue(&queue, bufferSize);
        quit = 0;

            // read from STDIN
            bytes = read(STDIN_FILENO,, sizeof(;

            // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
            if (bytes < 0 && errno == EINTR) continue;
            if (bytes <= 0) break;

            // save the number if read bytes in the current buffer to be processed
            buffer.bytes = bytes;

            // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
            // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
            bytes = write(STDOUT_FILENO,, buffer.bytes);

            if(-1==bytes) {
                perror("ftee: writing to stdout");

            sBuffer *tmpBuffer = NULL;

            // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
            // the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
            // NOTE: bytes cannot be -1  (that would have failed just before) when the loop is entered. 
            while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
               // write the oldest buffer to the pipe
               bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);

               // the  written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
               if (bytes == tmpBuffer->bytes) {
               } else if (bytes > 0) {
                 // on a positive bytes value there was a partial write. we shrink the current buffer
                 //  and handle this as a write failure
                 ShrinkInQueue(&queue, bytes);
                 bytes = -1;
            // There are several cases here:
            // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
            // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
            // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
            if (bytes != -1) bytes = write(writefd,, buffer.bytes);

            // again, there are several cases what can happen here
            // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
            // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
            // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
            if (bytes != buffer.bytes)
              PushToQueue(&queue, &buffer, bytes);

        // once we are done with STDIN, try to flush the buffer to the named pipe
        if ( > 0) {
           //set output buffer to block - here we wait until we can write everything to the named pipe
           // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. 
           int saved_flags = fcntl(writefd, F_GETFL);
           int new_flags = saved_flags & ~O_NONBLOCK;
           int res = fcntl(writefd, F_SETFL, new_flags);

           sBuffer *tmpBuffer = NULL;
           //TODO: this does not handle partial writes yet
           while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
             int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
             if (bytes != -1) DelFromQueue(&queue);



    /** init a given Queue **/
    void InitQueue (sQueue *queue, int bufferSize) {
      queue->data = calloc(bufferSize, sizeof(sBuffer));
      queue->bufferSize = bufferSize;
      queue->start = 0;
      queue->end = 0;
      queue->active = 0;
      queue->maxUse = 0;
      queue->drops = 0;
      queue->sWrites = 0;
      queue->pWrites = 0;

    /** push a buffer into the Queue**/
    void PushToQueue(sQueue *queue, sBuffer *p, int offset)

        if (offset < 0) offset = 0;      // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
        if (offset == p->bytes) return;  // in this case there are 0 bytes to add to the queue. Nothing to write

        // this should never happen - offset cannot be bigger than the buffer itself. Panic action
        if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}

        // debug output on a partial write. TODO: remove this line
        // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");

        // copy the data from the buffer into the queue and remember its size
        memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
        queue->data[queue->end].bytes = p->bytes - offset;

        // move the buffer forward
        queue->end = (queue->end + 1) % queue->bufferSize;

        // there is still space in the buffer
        if (queue->active < queue->bufferSize)
            if (queue->active > queue->maxUse) queue->maxUse = queue->active;
        } else {
            // Overwriting the oldest. Move start to next-oldest
            queue->start = (queue->start + 1) % queue->bufferSize;

    /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
    sBuffer *RetrieveFromQueue(sQueue *queue)
        if (!queue->active) { return NULL; }

        queue->start = (queue->start + 1) % queue->bufferSize;
        return &(queue->data[queue->start]);

    /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
    sBuffer *PeakAtQueue(sQueue *queue)
        if (!queue->active) { return NULL; }
        return &(queue->data[queue->start]);

    /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
    void ShrinkInQueue(sQueue *queue, int bytes) {

      // cannot remove negative amount of bytes - this is an error case. Ignore it
      if (bytes <= 0) return;

      // remove the entry if the offset is equal to the buffer size
      if (queue->data[queue->start].bytes == bytes) {

      // this is a partial delete
      if (queue->data[queue->start].bytes > bytes) {
        //shift the memory by the offset
        memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
        queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;

      // panic is the are to remove more than we have the buffer
      if (queue->data[queue->start].bytes < bytes) {
        perror("we wrote more than we had - this should never happen\n");

    /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
    void DelFromQueue(sQueue *queue)
        if (queue->active > 0) {
          queue->start = (queue->start + 1) % queue->bufferSize;

    /** Stats output on SIGUSR1 **/
    static void sigUSR1(int signo) {
      fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n",, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);

    /** handle signal for terminating **/
    static void sigINT(int signo) {
      if (quit > 1) exit(EXIT_FAILURE);

Questa versione accetta un altro argomento (facoltativo) che specifica il numero di blocchi che devono essere bufferizzati per la pipe. La mia chiamata di esempio ora ha questo aspetto:

some_process | bftee >( > results) 16384 | gzip > raw_data.gz

risultando in 16384 blocchi da bufferizzare prima che avvengano gli scarti. questo utilizza circa 32 Mbyte in più di memoria, ma... chi se ne frega?

Ovviamente, nell'ambiente reale sto usando una pipe con nome in modo da poterla attaccare e staccare secondo necessità. C'è un aspetto simile a questo:

mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | > results

Inoltre, il processo reagisce ai segnali come segue:SIGUSR1 -> stampa i contatori su STDERRSIGTERM, SIGINT -> il primo esce dal ciclo principale e scarica il buffer nella pipe, il secondo termina immediatamente il programma.

Forse questo aiuta qualcuno in futuro... Buon divertimento

Ispirato dalla tua domanda, ho scritto un semplice programma che ti permetterà di fare questo:

$ myprogram 2>&1 | ftee /tmp/mylog

Si comporta in modo simile a tee ma clona lo stdin su stdout e su una named pipe (un requisito per ora) senza bloccare. Ciò significa che se vuoi accedere in questo modo potrebbe succedere che perderai i tuoi dati di registro, ma immagino sia accettabile nel tuo scenario.Il trucco è bloccare SIGPIPE segnale e per ignorare l'errore durante la scrittura su un fifo rotto. Questo esempio può essere ottimizzato in vari modi, naturalmente, ma finora, immagino, fa il suo lavoro.

/* ftee - clone stdin to stdout and to a named pipe 
(c) [email protected]
WTFPL Licence */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
    int readfd, writefd;
    struct stat status;
    char *fifonam;
    char buffer[BUFSIZ];
    ssize_t bytes;
    signal(SIGPIPE, SIG_IGN);

        printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
            " named pipe, required argument\n", argv[0]);
    fifonam = argv[1];

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
        perror("ftee: readfd: open()");

    if(-1==fstat(readfd, &status))
        perror("ftee: fstat");

        printf("ftee: %s in not a fifo!\n", fifonam);

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
        perror("ftee: writefd: open()");


        bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
        if (bytes < 0 && errno == EINTR)
        if (bytes <= 0)

        bytes = write(STDOUT_FILENO, buffer, bytes);
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes);
        if(-1==bytes);//Ignoring the errors

Puoi compilarlo con questo comando standard:

$ gcc ftee.c -o ftee

Puoi verificarlo rapidamente eseguendo ad esempio:

$ ping | ftee /tmp/mylog

$ cat /tmp/mylog

Nota anche:questo non è un multiplexer. Puoi avere un solo processo che esegue $ cat /tmp/mylog alla volta.

Sembra bash <> L'operatore di reindirizzamento (3.6.10 Opening File Descriptors for Reading and WritingSee) rende la scrittura su file/fifo aperta con esso non bloccante. Dovrebbe funzionare:

$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend

Soluzione data da gniourf_gniourf sul canale IRC #bash.

Tuttavia, ciò creerebbe un file di registro in continua crescita anche se non utilizzato fino a quando l'unità non esaurisce lo spazio.

Perché non ruotare periodicamente i registri? C'è persino un programma che lo fa per te logrotate .

C'è anche un sistema per generare messaggi di registro e fare cose diverse con loro a seconda del tipo. Si chiama syslog .

Potresti anche combinare i due. Fai in modo che il tuo programma generi messaggi syslog, configura syslog per inserirli in un file e usa logrotate per assicurarti che non riempiano il disco.

Se si scopre che stavi scrivendo per un piccolo sistema embedded e l'output del programma è pesante, ci sono una varietà di tecniche che potresti prendere in considerazione.

  • Syslog remoto:invia i messaggi syslog a un server syslog sulla rete.
  • Usa i livelli di gravità disponibili in syslog per fare cose diverse con i messaggi. Per esempio. scartare "INFO" ma registrare e inoltrare "ERR" o superiore. Per esempio. consolare
  • Utilizza un gestore di segnale nel tuo programma per rileggere la configurazione su HUP e variare la generazione di log "su richiesta" in questo modo.
  • Fai ascoltare il tuo programma su un socket unix e scrivi i messaggi quando è aperto. Potresti persino implementare una console interattiva nel tuo programma in questo modo.
  • Utilizzando un file di configurazione, fornisci un controllo granulare dell'output di registrazione.

