Unable to read the metalog file in multipe fetch messages stage of kafka

Error reading log file: Error: Log file not found: /tmp/kraft-combined-logs/pax-0.log
remote: [your_program] at readFromFileBuffer (file:///app/app/fetch_api_request.js:106:19)
remote: [your_program] at file:///app/app/fetch_api_request.js:58:15
remote: [your_program] at Array.map ()
remote: [your_program] at handleFetchApiRequest (file:///app/app/fetch_api_request.js:24:8)
remote: [your_program] at Socket. (file:///app/app/main.js:28:7)
remote: [your_program] at Socket.emit (node:events:519:28)
remote: [your_program] at addChunk (node:internal/streams/readable:559:12)
remote: [your_program] at readableAddChunkPushByteMode (node:internal/streams/readable:510:3)
remote: [your_program] at Readable.push (node:internal/streams/readable:390:5)
remote: [your_program] at TCP.onStreamRead (node:internal/stream_base_commons:190:23)
remote: [your_program] recordBatch { messages: , nextOffset: 0 }
remote: [your_program] node:buffer:587
remote: [your_program] throw new ERR_INVALID_ARG_TYPE(
remote: [your_program] ^
remote: [your_program]
remote: [your_program] TypeError [ERR_INVALID_ARG_TYPE]: The “list[8]” argument must be an instance of Buffer or Uint8Array. Received an instance of Object
remote: [your_program] at Function.concat (node:buffer:587:13)
remote: [your_program] at file:///app/app/fetch_api_request.js:60:45
remote: [your_program] at Array.map ()
remote: [your_program] at handleFetchApiRequest (file:///app/app/fetch_api_request.js:24:8)
remote: [your_program] at Socket. (file:///app/app/main.js:28:7)
remote: [your_program] at Socket.emit (node:events:519:28)
remote: [your_program] at addChunk (node:internal/streams/readable:559:12)
remote: [your_program] at readableAddChunkPushByteMode (node:internal/streams/readable:510:3)
remote: [your_program] at Readable.push (node:internal/streams/readable:390:5)
remote: [your_program] at TCP.onStreamRead (node:internal/stream_base_commons:190:23) {
remote: [your_program] code: ‘ERR_INVALID_ARG_TYPE’
remote: [your_program] }
remote: [your_program]
remote: [your_program] Node.js v21.7.3
remote: [tester::#FD8] EOF
remote: [tester::#FD8] Test failed

unable to read the metadata file
what could be the issue

Hey @faith-in-het, look like quz- is missing its partition suffix:

Let me know if you’d like a hand digging into this further.

i did some changes and got quz-0 but could not get quz-1 so yeah would like more help please

@faith-in-het Could you push your latest changes to GitHub?

The correct log file path should look like:

/tmp/kraft-combined-logs/quz-0/00000000000000000000.log

In your screenshot, it appears as:

/tmp/kraft-combined-logs/quz-0.1og

The thing is, when I push the code twice, one time it shows quz-0 and the other time it shows puz-0 for the same code. also you can check the code

I already pushed the latest code @andy1li

I already pushed the latest code @andy1li

Hey @faith-in-het, sorry for the delayed response!

Looks like the issue comes from using the buffer partitionIndex directly without parsing it first:

import fs from “fs”;

import { sendResponseMessage } from “./utils/index.js”;

export const handleFetchApiRequest = (connection, responseMessage, buffer) => {

const clientLength = buffer.subarray(12, 14);

const clientLengthValue = clientLength.readInt16BE();

const throttleTime = Buffer.from([0, 0, 0, 0]);

const errorCode = Buffer.from([0, 0]);

let responses = Buffer.from([1]);

const tagBuffer = Buffer.from([0]);

const sessionIdIndex = clientLengthValue + 28;

const sessionId = buffer.subarray(sessionIdIndex, sessionIdIndex + 4);

const _sessionEpoch = buffer.subarray(sessionIdIndex + 4, sessionIdIndex + 8);

const topicArrayLength = buffer.subarray(

sessionIdIndex + 8,

sessionIdIndex + 9,

);

let topicIndex = sessionIdIndex + 9;

if (topicArrayLength.readInt8() > 1) {

const topics = new Array(topicArrayLength.readInt8() - 1)

  .fill(0)

  .map((\_) => {

    const topicId = buffer.subarray(topicIndex, topicIndex + 16);

    topicIndex += 16;



    const logFile = fs.readFileSync(

      \`/tmp/kraft-combined-logs/\__cluster_metadata-0/00000000000000000000.log\`,

    );

    const logFileIndex = logFile.indexOf(topicId);

    let partitionError = Buffer.from(\[0, 100\]);

    let topicName = "";



    if (logFileIndex !== -1) {

      partitionError = Buffer.from(\[0, 0\]);

      topicName = logFile.subarray(logFileIndex - 3, logFileIndex);

    }



    const partitionArrayIndex = topicIndex;

    const partitionLength = buffer.subarray(

      partitionArrayIndex,

      partitionArrayIndex + 1,

    );

    const partitionIndex = buffer.subarray(

      partitionArrayIndex + 1,

      partitionArrayIndex + 5,

    );



    const highWaterMark = Buffer.from(new Array(8).fill(0));

    const last_stable_offset = Buffer.from(new Array(8).fill(0));

    const log_start_offset = Buffer.from(new Array(8).fill(0));

    const aborted_transactions = Buffer.from(\[0\]);

    const preferredReadReplica = Buffer.from(\[0, 0, 0, 0\]);

    

    // Get the record batch with proper compact format

    const recordBatch = logFileIndex === -1

      ? Buffer.from(\[0\]) // Just 0 for no records

      : readFromFileBuffer(topicName.toString(), partitionIndex.readInt32BE());

        

    console.log("recordBatch", recordBatch);



    const partitionArrayBuffer = Buffer.concat(\[

      partitionLength,

      partitionIndex,

      partitionError,

      highWaterMark,

      last_stable_offset,

      log_start_offset,

      aborted_transactions,

      preferredReadReplica,

      recordBatch,

      tagBuffer,

    \]);



    return Buffer.concat(\[topicId, partitionArrayBuffer, tagBuffer\]);

  });



responses = Buffer.concat(\[topicArrayLength, ...topics\]);

}

let fetchRequestResponse = {

correlationId: responseMessage.correlationId,

responseHeaderTagbuffer: tagBuffer,

throttleTime,

errorCode,

sessionId,

responses,

tagBuffer,

};

const messageSizeBuffer = Buffer.alloc(4);

messageSizeBuffer.writeInt32BE(

Buffer.concat(Object.values(fetchRequestResponse)).length

);

fetchRequestResponse = {

messageSize: messageSizeBuffer,

...fetchRequestResponse,

};

sendResponseMessage(connection, fetchRequestResponse);

};

function readFromFileBuffer(topicName, partitionIndex) {

try {

    const logFilePath = \`/tmp/kraft-combined-logs/${topicName}-${partitionIndex}/00000000000000000000.log\`;



    if (!fs.existsSync(logFilePath)) {

        console.log(\`Log file not found: ${logFilePath}\`);

        return Buffer.from(\[0\]); // No records

    }



    const logFile = fs.readFileSync(logFilePath);

    

    // For compact records format, we need to encode the length as a compact int

    // followed by the actual record data

    if (logFile.length === 0) {

        return Buffer.from(\[0\]); // No records

    }

    

    // Encode the length as a compact/varint and prepend it to the data

    const lengthBytes = encodeCompactInt(logFile.length);

    return Buffer.concat(\[lengthBytes, logFile\]);

    

} catch (error) {

    console.error("Error reading log file:", error);

    return Buffer.from(\[0\]); // No records

}

}

// Helper function to encode integers in compact/varint format

function encodeCompactInt(value) {

if (value === 0) {

    return Buffer.from(\[0\]);

}



const bytes = \[\];

let remaining = value;



while (remaining > 0x7F) {

    bytes.push((remaining & 0x7F) | 0x80);

    remaining >>>= 7;

}

bytes.push(remaining & 0x7F);



return Buffer.from(bytes);

}

i updated the fetch api code like this changed some format and added varint encoding thing so then it worked out for me
Thank you so much for your support

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.