如果路径是作为参数传递的,您可以稍微修改上述代码,使其接受命令行参数或函数参数作为文件路径。以下是一个示例,以函数参数形式接受文件路径:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
public class FileUpload {
private static final long MAX_HDFS_UPLOAD_SIZE = 10 * 1024 * 1024; // 10MB
public static void uploadToHDFS(String localFilePath, String hdfsFilePath) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(localFilePath);
Path hdfsPath = new Path(hdfsFilePath);
fs.copyFromLocalFile(false, true, localPath, hdfsPath);
fs.close();
}
public static void uploadToHBase(String localFilePath, String hbaseTableName, String hbaseRowKey) throws IOException {
Configuration conf = new Configuration();
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(Bytes.toBytes(hbaseTableName));
File file = new File(localFilePath);
byte[] fileBytes = Files.readAllBytes(file.toPath());
Put put = new Put(Bytes.toBytes(hbaseRowKey));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("file"), fileBytes);
table.put(put);
table.close();
connection.close();
}
public static void processFile(String filePath, String hdfsFilePath, String hbaseTableName, String hbaseRowKey) {
File file = new File(filePath);
long fileSize = file.length();
if (fileSize > MAX_HDFS_UPLOAD_SIZE) {
try {
uploadToHDFS(filePath, hdfsFilePath);
System.out.println("File uploaded to HDFS");
} catch (IOException e) {
System.err.println("Failed to upload file to HDFS: " + e.getMessage());
}
} else {
try {
uploadToHBase(filePath, hbaseTableName, hbaseRowKey);
System.out.println("File uploaded to HBase");
} catch (IOException e) {
System.err.println("Failed to upload file to HBase: " + e.getMessage());
}
}
}
public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: java FileUpload <file_path> <hdfs_file_path> <hbase_table_name> <hbase_row_key>");
System.exit(1);
}
String filePath = args[0];
String hdfsFilePath = args[1];
String hbaseTableName = args[2];
String hbaseRowKey = args[3];
processFile(filePath, hdfsFilePath, hbaseTableName, hbaseRowKey);
}
}
在这个修改后的代码中,添加了名为 processFile()
的新方法,它接受文件路径以及 HDFS 和 HBase 相关参数作为参数。在 main()
方法中,从命令行参数中获取输入,并调用 processFile()
方法来处理文件。
要运行该代码,您可以在命令行中使用以下命令:
java FileUpload <file_path> <hdfs_file_path> <hbase_table_name> <hbase_row_key>
请确保将 <file_path>
替换为本地文件的路径,<hdfs_file_path>
替换为 HDFS 中文件的路径,<hbase_table_name>
替换为 HBase 表的名称,<hbase_row_key>
替换为 HBase 行的唯一标识。
希望这次的修改满足了您的需求!如果您有其他问题,请随时提问。