diff --git a/packages/utils/src/lib/file-sink-jsonl.int.test.ts b/packages/utils/src/lib/file-sink-jsonl.int.test.ts new file mode 100644 index 000000000..e0f57bbaa --- /dev/null +++ b/packages/utils/src/lib/file-sink-jsonl.int.test.ts @@ -0,0 +1,138 @@ +import * as fs from 'node:fs'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { teardownTestFolder } from '@code-pushup/test-utils'; +import { JsonlFileSink, recoverJsonlFile } from './file-sink-jsonl.js'; + +describe('JsonlFileSink integration', () => { + const baseDir = path.join(os.tmpdir(), 'file-sink-json-int-tests'); + const testFile = path.join(baseDir, 'test-data.jsonl'); + + beforeAll(async () => { + await fs.promises.mkdir(baseDir, { recursive: true }); + }); + + beforeEach(async () => { + try { + await fs.promises.unlink(testFile); + } catch { + // File doesn't exist, which is fine + } + }); + + afterAll(async () => { + await teardownTestFolder(baseDir); + }); + + describe('file operations', () => { + const testData = [ + { id: 1, name: 'Alice', active: true }, + { id: 2, name: 'Bob', active: false }, + { id: 3, name: 'Charlie', active: true }, + ]; + + it('should write and read JSONL files', async () => { + const sink = new JsonlFileSink({ filePath: testFile }); + + // Open and write data + sink.open(); + testData.forEach(item => sink.write(item)); + sink.close(); + + expect(fs.existsSync(testFile)).toBe(true); + const fileContent = fs.readFileSync(testFile, 'utf8'); + const lines = fileContent.trim().split('\n'); + expect(lines).toStrictEqual([ + '{"id":1,"name":"Alice","active":true}', + '{"id":2,"name":"Bob","active":false}', + '{"id":3,"name":"Charlie","active":true}', + ]); + + lines.forEach((line, index) => { + const parsed = JSON.parse(line); + expect(parsed).toStrictEqual(testData[index]); + }); + }); + + it('should recover data from JSONL files', async () => { + const jsonlContent = `${testData.map(item => JSON.stringify(item)).join('\n')}\n`; + fs.writeFileSync(testFile, jsonlContent); + + expect(recoverJsonlFile(testFile)).toStrictEqual({ + records: testData, + errors: [], + partialTail: null, + }); + }); + + it('should handle JSONL files with parse errors', async () => { + const mixedContent = + '{"id":1,"name":"Alice"}\n' + + 'invalid json line\n' + + '{"id":2,"name":"Bob"}\n' + + '{"id":3,"name":"Charlie","incomplete":\n'; + + fs.writeFileSync(testFile, mixedContent); + + expect(recoverJsonlFile(testFile)).toStrictEqual({ + records: [ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ], + errors: [ + expect.objectContaining({ line: 'invalid json line' }), + expect.objectContaining({ + line: '{"id":3,"name":"Charlie","incomplete":', + }), + ], + partialTail: '{"id":3,"name":"Charlie","incomplete":', + }); + }); + + it('should recover data using JsonlFileSink.recover()', async () => { + const sink = new JsonlFileSink({ filePath: testFile }); + sink.open(); + testData.forEach(item => sink.write(item)); + sink.close(); + + expect(sink.recover()).toStrictEqual({ + records: testData, + errors: [], + partialTail: null, + }); + }); + + describe('edge cases', () => { + it('should handle empty files', async () => { + fs.writeFileSync(testFile, ''); + + expect(recoverJsonlFile(testFile)).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + + it('should handle files with only whitespace', async () => { + fs.writeFileSync(testFile, ' \n \n\t\n'); + + expect(recoverJsonlFile(testFile)).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + + it('should handle non-existent files', async () => { + const nonExistentFile = path.join(baseDir, 'does-not-exist.jsonl'); + + expect(recoverJsonlFile(nonExistentFile)).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + }); + }); +}); diff --git a/packages/utils/src/lib/file-sink-jsonl.ts b/packages/utils/src/lib/file-sink-jsonl.ts new file mode 100644 index 000000000..646cd82b1 --- /dev/null +++ b/packages/utils/src/lib/file-sink-jsonl.ts @@ -0,0 +1,60 @@ +import * as fs from 'node:fs'; +import { + type FileOutput, + FileSink, + type FileSinkOptions, + stringDecode, + stringEncode, + stringRecover, +} from './file-sink-text.js'; +import type { RecoverOptions, RecoverResult } from './sink-source.types.js'; + +export const jsonlEncode = < + T extends Record = Record, +>( + input: T, +): FileOutput => JSON.stringify(input); + +export const jsonlDecode = < + T extends Record = Record, +>( + output: FileOutput, +): T => JSON.parse(stringDecode(output)) as T; + +export function recoverJsonlFile< + T extends Record = Record, +>(filePath: string, opts: RecoverOptions = {}): RecoverResult { + return stringRecover(filePath, jsonlDecode, opts); +} + +export class JsonlFileSink< + T extends Record = Record, +> extends FileSink { + constructor(options: FileSinkOptions) { + const { filePath, ...fileOptions } = options; + super({ + ...fileOptions, + filePath, + recover: () => recoverJsonlFile(filePath), + finalize: () => { + // No additional finalization needed for JSONL files + }, + }); + } + + override encode(input: T): FileOutput { + return stringEncode(jsonlEncode(input)); + } + + override decode(output: FileOutput): T { + return jsonlDecode(stringDecode(output)); + } + + override repack(outputPath?: string): void { + const { records } = this.recover(); + fs.writeFileSync( + outputPath ?? this.getFilePath(), + records.map(this.encode).join(''), + ); + } +} diff --git a/packages/utils/src/lib/file-sink-jsonl.unit.test.ts b/packages/utils/src/lib/file-sink-jsonl.unit.test.ts new file mode 100644 index 000000000..75f981cb0 --- /dev/null +++ b/packages/utils/src/lib/file-sink-jsonl.unit.test.ts @@ -0,0 +1,242 @@ +import { vol } from 'memfs'; +import * as fs from 'node:fs'; +import { beforeEach, describe, expect, it } from 'vitest'; +import { MEMFS_VOLUME } from '@code-pushup/test-utils'; +import { + JsonlFileSink, + jsonlDecode, + jsonlEncode, + recoverJsonlFile, +} from './file-sink-jsonl.js'; + +describe('jsonlEncode', () => { + it('should encode object to JSON string', () => { + const obj = { key: 'value', number: 42 }; + expect(jsonlEncode(obj)).toBe(JSON.stringify(obj)); + }); + + it('should handle nested objects', () => { + const obj = { nested: { deep: 'value' }, array: [1, 2, 3] }; + expect(jsonlEncode(obj)).toBe(JSON.stringify(obj)); + }); + + it('should handle empty object', () => { + expect(jsonlEncode({})).toBe('{}'); + }); +}); + +describe('jsonlDecode', () => { + it('should decode JSON string to object', () => { + const obj = { key: 'value', number: 42 }; + const jsonStr = `${JSON.stringify(obj)}\n`; + expect(jsonlDecode(jsonStr)).toStrictEqual(obj); + }); + + it('should handle nested objects', () => { + const obj = { nested: { deep: 'value' }, array: [1, 2, 3] }; + const jsonStr = `${JSON.stringify(obj)}\n`; + expect(jsonlDecode(jsonStr)).toStrictEqual(obj); + }); + + it('should trim whitespace before parsing', () => { + const obj = { key: 'value' }; + const jsonStr = ` ${JSON.stringify(obj)} \n`; + expect(jsonlDecode(jsonStr)).toStrictEqual(obj); + }); + + it('should throw on invalid JSON', () => { + expect(() => jsonlDecode('invalid json\n')).toThrow('Unexpected token'); + }); + + it('should handle Buffer input', () => { + const obj = { key: 'value', number: 42 }; + const jsonStr = `${JSON.stringify(obj)}\n`; + expect(jsonlDecode(Buffer.from(jsonStr))).toStrictEqual(obj); + }); + + it('should handle primitive JSON values', () => { + expect(jsonlDecode('"string"\n')).toBe('string'); + expect(jsonlDecode('42\n')).toBe(42); + expect(jsonlDecode('true\n')).toBe(true); + expect(jsonlDecode('null\n')).toBeNull(); + }); +}); + +describe('recoverJsonlFile', () => { + beforeEach(() => { + vol.fromJSON( + { + '/tmp': null, + }, + MEMFS_VOLUME, + ); + }); + + it('should recover JSONL file with single object', () => { + const filePath = '/tmp/recover-single.jsonl'; + const obj = { key: 'value', number: 42 }; + fs.writeFileSync(filePath, `${JSON.stringify(obj)}\n`); + + expect(recoverJsonlFile(filePath)).toStrictEqual({ + records: [obj], + errors: [], + partialTail: null, + }); + }); + + it('should recover JSONL file with multiple objects', () => { + const filePath = '/tmp/recover-multi.jsonl'; + const obj1 = { id: 1, name: 'first' }; + const obj2 = { id: 2, name: 'second' }; + fs.writeFileSync( + filePath, + `${JSON.stringify(obj1)}\n${JSON.stringify(obj2)}\n`, + ); + + expect(recoverJsonlFile(filePath)).toStrictEqual({ + records: [obj1, obj2], + errors: [], + partialTail: null, + }); + }); + + it('should handle JSON parsing errors', () => { + const filePath = '/tmp/recover-error.jsonl'; + fs.writeFileSync( + filePath, + '{"valid": "json"}\ninvalid json line\n{"id":3,"name":"Charlie","incomplete":\n', + ); + + const result = recoverJsonlFile(filePath); + expect(result.records).toStrictEqual([{ valid: 'json' }]); + expect(result.errors).toStrictEqual([ + expect.objectContaining({ line: 'invalid json line' }), + expect.objectContaining({ + line: '{"id":3,"name":"Charlie","incomplete":', + }), + ]); + expect(result.partialTail).toBe('{"id":3,"name":"Charlie","incomplete":'); + }); + + it('should support keepInvalid option', () => { + const filePath = '/tmp/recover-keep-invalid.jsonl'; + fs.writeFileSync(filePath, '{"valid": "json"}\ninvalid json\n'); + + const result = recoverJsonlFile(filePath, { keepInvalid: true }); + expect(result.records).toStrictEqual([ + { valid: 'json' }, + { __invalid: true, lineNo: 2, line: 'invalid json' }, + ]); + expect(result.errors).toHaveLength(1); + }); + + it('should handle empty files', () => { + const filePath = '/tmp/recover-empty.jsonl'; + fs.writeFileSync(filePath, ''); + + expect(recoverJsonlFile(filePath)).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + + it('should handle file read errors gracefully', () => { + expect(recoverJsonlFile('/nonexistent/file.jsonl')).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); +}); + +describe('JsonlFileSink', () => { + beforeEach(() => { + vol.fromJSON( + { + '/tmp': null, + }, + MEMFS_VOLUME, + ); + }); + + type JsonObj = { key: string; number: number }; + + it('should encode objects as JSON', () => { + const sink = new JsonlFileSink({ + filePath: '/tmp/jsonl-test.jsonl', + }); + const obj = { key: 'value', number: 42 }; + expect(sink.encode(obj)).toBe(`${JSON.stringify(obj)}\n`); + }); + + it('should decode JSON strings to objects', () => { + const sink = new JsonlFileSink({ + filePath: '/tmp/jsonl-test.jsonl', + }); + const obj = { key: 'value', number: 42 }; + const jsonStr = `${JSON.stringify(obj)}\n`; + expect(sink.decode(jsonStr)).toStrictEqual(obj); + }); + + it('should handle file operations with JSONL format', () => { + const filePath = '/tmp/jsonl-file-ops-test.jsonl'; + const sink = new JsonlFileSink({ filePath }); + sink.open(); + + const obj1 = { key: 'value', number: 42 }; + const obj2 = { key: 'value', number: 42 }; + sink.write(obj1); + sink.write(obj2); + sink.close(); + + const recovered = sink.recover(); + expect(recovered.records).toStrictEqual([obj1, obj2]); + }); + + it('repack() should recover records and write them to output path', () => { + const filePath = '/tmp/jsonl-repack-test.jsonl'; + const sink = new JsonlFileSink({ filePath }); + const records = [ + { key: 'value', number: 42 }, + { key: 'value', number: 42 }, + ]; + + fs.writeFileSync( + filePath, + `${records.map(record => JSON.stringify(record)).join('\n')}\n`, + ); + + sink.repack(); + expect(fs.readFileSync(filePath, 'utf8')).toBe( + `${JSON.stringify(records[0])}\n${JSON.stringify(records[1])}\n`, + ); + }); + + it('repack() should accept output path', () => { + const filePath = '/tmp/jsonl-repack-test.jsonl'; + const sink = new JsonlFileSink({ filePath }); + const records = [ + { key: 'value', number: 42 }, + { key: 'value', number: 42 }, + ]; + + fs.writeFileSync( + filePath, + `${records.map(record => JSON.stringify(record)).join('\n')}\n`, + ); + + const outputPath = '/tmp/jsonl-repack-output.jsonl'; + sink.repack(outputPath); + expect(fs.readFileSync(outputPath, 'utf8')).toBe( + `${JSON.stringify(records[0])}\n${JSON.stringify(records[1])}\n`, + ); + }); + + it('should do nothing on finalize()', () => { + const sink = new JsonlFileSink({ + filePath: '/tmp/jsonl-finalize-test.jsonl', + }); + expect(() => sink.finalize()).not.toThrow(); + }); +}); diff --git a/packages/utils/src/lib/file-sink-text.int.test.ts b/packages/utils/src/lib/file-sink-text.int.test.ts new file mode 100644 index 000000000..19ea34fb0 --- /dev/null +++ b/packages/utils/src/lib/file-sink-text.int.test.ts @@ -0,0 +1,184 @@ +import * as fs from 'node:fs'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { teardownTestFolder } from '@code-pushup/test-utils'; +import { FileSink, stringRecover } from './file-sink-text.js'; + +describe('FileSink integration', () => { + const baseDir = path.join(os.tmpdir(), 'file-sink-text-int-tests'); + const testFile = path.join(baseDir, 'test-data.txt'); + + beforeAll(async () => { + await fs.promises.mkdir(baseDir, { recursive: true }); + }); + + beforeEach(async () => { + try { + await fs.promises.unlink(testFile); + } catch { + // File doesn't exist, which is fine + } + }); + + afterAll(async () => { + await teardownTestFolder(baseDir); + }); + + describe('file operations', () => { + const testData = ['line1', 'line2', 'line3']; + + it('should write and read text files', async () => { + const sink = new FileSink({ + filePath: testFile, + recover: () => stringRecover(testFile, (line: string) => line), + }); + + // Open and write data + sink.open(); + testData.forEach(item => sink.write(item)); + sink.close(); + + expect(fs.existsSync(testFile)).toBe(true); + const fileContent = fs.readFileSync(testFile, 'utf8'); + const lines = fileContent.trim().split('\n'); + expect(lines).toStrictEqual(testData); + + lines.forEach((line, index) => { + expect(line).toStrictEqual(testData[index]); + }); + }); + + it('should recover data from text files', async () => { + const content = `${testData.join('\n')}\n`; + fs.writeFileSync(testFile, content); + + expect(stringRecover(testFile, (line: string) => line)).toStrictEqual({ + records: testData, + errors: [], + partialTail: null, + }); + }); + + it('should handle text files with parse errors', async () => { + const mixedContent = 'valid\ninvalid\nanother\n'; + fs.writeFileSync(testFile, mixedContent); + + expect( + stringRecover(testFile, (line: string) => { + if (line === 'invalid') throw new Error('Invalid line'); + return line.toUpperCase(); + }), + ).toStrictEqual({ + records: ['VALID', 'ANOTHER'], + errors: [ + expect.objectContaining({ + lineNo: 2, + line: 'invalid', + error: expect.any(Error), + }), + ], + partialTail: 'invalid', + }); + }); + + it('should repack file with recovered data', async () => { + const sink = new FileSink({ + filePath: testFile, + recover: () => stringRecover(testFile, (line: string) => line), + }); + + // Write initial data + sink.open(); + testData.forEach(item => sink.write(item)); + sink.close(); + + // Repack to the same file + sink.repack(); + + // Verify the content is still correct + const fileContent = fs.readFileSync(testFile, 'utf8'); + const lines = fileContent + .trim() + .split('\n') + .filter(line => line.length > 0); + expect(lines).toStrictEqual(testData); + }); + + it('should repack file to different output path', async () => { + const outputPath = path.join(baseDir, 'repacked.txt'); + const sink = new FileSink({ + filePath: testFile, + recover: () => stringRecover(testFile, (line: string) => line), + }); + + // Write initial data + sink.open(); + testData.forEach(item => sink.write(item)); + sink.close(); + + // Repack to different file + sink.repack(outputPath); + + // Verify the original file is unchanged + expect(fs.existsSync(testFile)).toBe(true); + + // Verify the repacked file has correct content + expect(fs.existsSync(outputPath)).toBe(true); + const fileContent = fs.readFileSync(outputPath, 'utf8'); + const lines = fileContent + .trim() + .split('\n') + .filter(line => line.length > 0); + expect(lines).toStrictEqual(testData); + }); + + it('should call finalize function when provided', async () => { + let finalized = false; + const sink = new FileSink({ + filePath: testFile, + recover: () => stringRecover(testFile, (line: string) => line), + finalize: () => { + finalized = true; + }, + }); + + sink.finalize(); + expect(finalized).toBe(true); + }); + }); + + describe('edge cases', () => { + it('should handle empty files', async () => { + fs.writeFileSync(testFile, ''); + + expect(stringRecover(testFile, (line: string) => line)).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + + it('should handle files with only whitespace', async () => { + fs.writeFileSync(testFile, ' \n \n\t\n'); + + expect(stringRecover(testFile, (line: string) => line)).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + + it('should handle non-existent files', async () => { + const nonExistentFile = path.join(baseDir, 'does-not-exist.txt'); + + expect( + stringRecover(nonExistentFile, (line: string) => line), + ).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); + }); +}); diff --git a/packages/utils/src/lib/file-sink-text.ts b/packages/utils/src/lib/file-sink-text.ts new file mode 100644 index 000000000..3cafacbe4 --- /dev/null +++ b/packages/utils/src/lib/file-sink-text.ts @@ -0,0 +1,147 @@ +import * as fs from 'node:fs'; +import { existsSync, mkdirSync } from 'node:fs'; +import path from 'node:path'; +import type { + RecoverOptions, + RecoverResult, + Recoverable, + Sink, +} from './sink-source.types.js'; + +export const stringDecode = (output: O): I => { + const str = Buffer.isBuffer(output) + ? output.toString('utf8') + : String(output); + return str as unknown as I; +}; + +export const stringEncode = (input: I): O => + `${typeof input === 'string' ? input : JSON.stringify(input)}\n` as O; + +export const stringRecover = function ( + filePath: string, + decode: (output: O) => I, + opts: RecoverOptions = {}, +): RecoverResult { + const records: I[] = []; + const errors: { lineNo: number; line: string; error: Error }[] = []; + let partialTail: string | null = null; + + try { + const content = fs.readFileSync(filePath, 'utf8'); + const lines = content.trim().split('\n'); + let lineNo = 0; + + for (const line of lines) { + lineNo++; + const trimmedLine = line.trim(); + if (!trimmedLine) { + continue; + } + + try { + const record = decode(trimmedLine as O); + records.push(record); + } catch (error) { + const info = { lineNo, line, error: error as Error }; + errors.push(info); + + if (opts.keepInvalid) { + records.push({ __invalid: true, lineNo, line } as any); + } + + partialTail = line; + } + } + } catch { + return { records: [], errors: [], partialTail: null }; + } + + return { records, errors, partialTail }; +}; + +export type FileSinkOptions = { + filePath: string; + recover?: () => RecoverResult; + finalize?: () => void; +}; + +export type FileInput = Buffer | string; +export type FileOutput = Buffer | string; + +export class FileSink + implements Sink, Recoverable +{ + #fd: number | null = null; + options: FileSinkOptions; + + constructor(options: FileSinkOptions) { + this.options = options; + } + + isClosed(): boolean { + return this.#fd == null; + } + + encode(input: I): O { + return stringEncode(input as any); + } + + decode(output: O): I { + return stringDecode(output as any); + } + getFilePath(): string { + return this.options.filePath; + } + + open(withRepack: boolean = false): void { + const dir = path.dirname(this.options.filePath); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + if (withRepack) { + this.repack(this.options.filePath); + } + this.#fd = fs.openSync(this.options.filePath, 'a'); + } + + close(): void { + if (this.#fd == null) { + return; + } + fs.closeSync(this.#fd); + this.#fd = null; + } + + write(input: I): void { + if (this.#fd == null) { + return; + } // Silently ignore if not open + const encoded = this.encode(input); + try { + fs.writeSync(this.#fd, encoded as any); + } catch { + // Silently ignore write errors (e.g., EBADF in test environments with mocked fs) + } + } + + recover(): RecoverResult { + const dir = path.dirname(this.options.filePath); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + return this.options.recover!() as RecoverResult; + } + + repack(outputPath?: string): void { + const { records } = this.recover(); + fs.writeFileSync( + outputPath ?? this.getFilePath(), + records.map(this.encode).join('\n'), + ); + } + + finalize(): void { + this.options.finalize!(); + } +} diff --git a/packages/utils/src/lib/file-sink-text.unit.test.ts b/packages/utils/src/lib/file-sink-text.unit.test.ts new file mode 100644 index 000000000..33cc9ad0e --- /dev/null +++ b/packages/utils/src/lib/file-sink-text.unit.test.ts @@ -0,0 +1,310 @@ +import { vol } from 'memfs'; +import * as fs from 'node:fs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { MEMFS_VOLUME } from '@code-pushup/test-utils'; +import { + FileSink, + type FileSinkOptions, + stringDecode, + stringEncode, + stringRecover, +} from './file-sink-text.js'; + +describe('stringEncode', () => { + it('stringEncode() should encode string input with newline', () => { + const str = 'test string'; + expect(stringEncode(str)).toBe(`${str}\n`); + }); + + it('stringEncode() should encode non-string input as JSON with newline', () => { + const obj = { key: 'value', number: 42 }; + expect(stringEncode(obj)).toBe(`${JSON.stringify(obj)}\n`); + }); + + it('stringEncode() should handle null input', () => { + expect(stringEncode(null)).toBe('null\n'); + }); + + it('stringEncode() should handle undefined input', () => { + expect(stringEncode(undefined)).toBe('undefined\n'); + }); +}); + +describe('stringDecode', () => { + it('stringDecode() should decode Buffer to string', () => { + const str = 'test content'; + expect(stringDecode(Buffer.from(str))).toBe(str); + }); + + it('stringDecode() should return string input as-is', () => { + const str = 'test string'; + expect(stringDecode(str)).toBe(str); + }); +}); + +describe('stringRecover', () => { + it('stringRecover() should recover records from valid file content', () => { + const filePath = '/tmp/stringRecover-test.txt'; + vol.fromJSON({ + [filePath]: 'line1\nline2\nline3\n', + }); + + expect(stringRecover(filePath, (line: string) => line)).toStrictEqual({ + records: ['line1', 'line2', 'line3'], + errors: [], + partialTail: null, + }); + }); + + it('stringRecover() should recover records and apply decode function', () => { + const filePath = '/tmp/stringRecover-test.txt'; + vol.fromJSON({ + [filePath]: 'line1\nline2\nline3\n', + }); + + expect( + stringRecover(filePath, (line: string) => line.toUpperCase()), + ).toStrictEqual({ + records: ['LINE1', 'LINE2', 'LINE3'], + errors: [], + partialTail: null, + }); + }); + + it('stringRecover() should skip empty lines', () => { + const filePath = '/tmp/stringRecover-empty-test.txt'; + vol.fromJSON({ + [filePath]: 'line1\n\nline2\n', + }); + + expect(stringRecover(filePath, (line: string) => line)).toStrictEqual({ + records: ['line1', 'line2'], + errors: [], + partialTail: null, + }); + }); + + it('stringRecover() should handle decode errors and continue processing', () => { + const filePath = '/tmp/stringRecover-error-test.txt'; + vol.fromJSON({ + [filePath]: 'valid\ninvalid\nanother', + }); + + expect( + stringRecover(filePath, (line: string) => { + if (line === 'invalid') throw new Error('Invalid line'); + return line.toUpperCase(); + }), + ).toStrictEqual({ + records: ['VALID', 'ANOTHER'], + errors: [ + { + lineNo: 2, + line: 'invalid', + error: expect.any(Error), + }, + ], + partialTail: 'invalid', + }); + }); + + it('stringRecover() should include invalid records when keepInvalid option is true', () => { + const filePath = '/tmp/stringRecover-invalid-test.txt'; + vol.fromJSON({ + [filePath]: 'valid\ninvalid\n', + }); + + expect( + stringRecover( + filePath, + (line: string) => { + if (line === 'invalid') throw new Error('Invalid line'); + return line.toUpperCase(); + }, + { keepInvalid: true }, + ), + ).toStrictEqual({ + records: ['VALID', { __invalid: true, lineNo: 2, line: 'invalid' }], + errors: [expect.any(Object)], + partialTail: 'invalid', + }); + }); + + it('stringRecover() should handle file read errors gracefully', () => { + expect( + stringRecover('/nonexistent/file.txt', (line: string) => line), + ).toStrictEqual({ + records: [], + errors: [], + partialTail: null, + }); + }); +}); + +describe('FileSink', () => { + it('constructor should create instance with options', () => { + const options: FileSinkOptions = { + filePath: '/tmp/test-file.txt', + recover: vi + .fn() + .mockReturnValue({ records: [], errors: [], partialTail: null }), + finalize: vi.fn(), + }; + expect(new FileSink(options).options).toBe(options); + }); + + it('getFilePath() should return the file path', () => { + const filePath = '/tmp/test-file.txt'; + const sink = new FileSink({ filePath }); + expect(sink.getFilePath()).toBe(filePath); + }); + + it('encode() should encode input using stringEncode', () => { + const sink = new FileSink({ filePath: '/tmp/test.txt' }); + const str = 'test input'; + expect(sink.encode(str)).toBe(`${str}\n`); + }); + + it('decode() should decode output using stringDecode', () => { + const sink = new FileSink({ filePath: '/tmp/test.txt' }); + const str = 'test output'; + expect(sink.decode(str)).toBe(str); + }); + + it('open() should handle directory creation and file opening', () => { + const sink = new FileSink({ filePath: '/tmp/test-file.txt' }); + sink.open(); + expect(fs.existsSync('/tmp/test-file.txt')).toBe(true); + }); + + it('open() should repack file when withRepack is true', () => { + const sink = new FileSink({ + filePath: '/tmp/test-file.txt', + recover: vi + .fn() + .mockReturnValue({ records: [], errors: [], partialTail: null }), + }); + const spy = vi.spyOn(sink, 'repack'); + sink.open(true); + expect(spy).toHaveBeenCalledWith('/tmp/test-file.txt'); + }); + + it('close() should close file descriptor if open', () => { + const sink = new FileSink({ filePath: '/tmp/test-file.txt' }); + sink.open(); + expect(() => sink.close()).not.toThrow(); + }); + + it('close() should do nothing if file descriptor is not open', () => { + const sink = new FileSink({ filePath: '/tmp/test-file.txt' }); + expect(() => sink.close()).not.toThrow(); + }); + + it('write() should write encoded input to file when sink is open', () => { + const sink = new FileSink({ filePath: '/tmp/write-open-unique-test.txt' }); + sink.open(); + const str = 'test data'; + sink.write(str); + expect(fs.readFileSync('/tmp/write-open-unique-test.txt', 'utf8')).toBe( + `${str}\n`, + ); + }); + + it('write() should silently ignore writes when file descriptor is not open', () => { + const sink = new FileSink({ filePath: '/tmp/write-test-closed.txt' }); + expect(() => sink.write('test data')).not.toThrow(); + }); + + it('write() should silently ignore write errors when fs.writeSync throws', () => { + const sink = new FileSink({ filePath: '/tmp/write-error-test.txt' }); + sink.open(); + + // Mock fs.writeSync to throw an error + const writeSyncSpy = vi.spyOn(fs, 'writeSync').mockImplementation(() => { + throw new Error('Write error'); + }); + + try { + // This should not throw despite the write error + expect(() => sink.write('test data')).not.toThrow(); + } finally { + // Restore original function + writeSyncSpy.mockRestore(); + sink.close(); + } + }); + + it('recover() should call the recover function from options', () => { + const mockRecover = vi + .fn() + .mockReturnValue({ records: ['test'], errors: [], partialTail: null }); + const sink = new FileSink({ + filePath: '/tmp/test-file.txt', + recover: mockRecover, + }); + expect(sink.recover()).toStrictEqual({ + records: ['test'], + errors: [], + partialTail: null, + }); + expect(mockRecover).toHaveBeenCalledWith(); + }); + + it('repack() should recover records and write them to output path', () => { + const mockRecover = vi.fn(); + const filePath = '/tmp/test-file.txt'; + const sink = new FileSink({ + filePath, + recover: mockRecover, + }); + const records = ['record1', 'record2']; + mockRecover.mockReturnValue({ records, errors: [], partialTail: null }); + + sink.repack(); + expect(mockRecover).toHaveBeenCalled(); + expect(fs.readFileSync(filePath, 'utf8')).toBe('record1\n\nrecord2\n'); + }); + + it('repack() should accept output path', () => { + const mockRecover = vi.fn(); + const sink = new FileSink({ + filePath: '/tmp/test-file.txt', + recover: mockRecover, + }); + const records = ['record1', 'record2']; + mockRecover.mockReturnValue({ records, errors: [], partialTail: null }); + const outputPath = '/tmp/repack-output.txt'; + sink.repack(outputPath); + expect(mockRecover).toHaveBeenCalled(); + expect(fs.readFileSync(outputPath, 'utf8')).toBe('record1\n\nrecord2\n'); + }); + + it('finalize() should call the finalize function from options', () => { + const mockFinalize = vi.fn(); + const sink = new FileSink({ + filePath: '/tmp/test-file.txt', + finalize: mockFinalize, + }); + sink.finalize(); + expect(mockFinalize).toHaveBeenCalledTimes(1); + }); + + it('isClosed() should return true when sink is not opened', () => { + const sink = new FileSink({ filePath: '/tmp/test-file.txt' }); + expect(sink.isClosed()).toBe(true); + }); + + it('isClosed() should return false when sink is opened', () => { + const sink = new FileSink({ filePath: '/tmp/test-file.txt' }); + sink.open(); + expect(sink.isClosed()).toBe(false); + }); + + it('isClosed() should return true when sink is closed after being opened', () => { + const sink = new FileSink({ filePath: '/tmp/test-file.txt' }); + sink.open(); + expect(sink.isClosed()).toBe(false); + sink.close(); + expect(sink.isClosed()).toBe(true); + }); +}); diff --git a/packages/utils/src/lib/sink-source.types.ts b/packages/utils/src/lib/sink-source.types.ts index ee096e31f..8eab82221 100644 --- a/packages/utils/src/lib/sink-source.types.ts +++ b/packages/utils/src/lib/sink-source.types.ts @@ -29,9 +29,9 @@ export type Observer = { isSubscribed: () => boolean; }; -export type Recoverable = { - recover: () => RecoverResult; - repack: () => void; +export type Recoverable = { + recover: () => RecoverResult; + repack: (outputPath?: string) => void; finalize: () => void; };