本篇内容主要讲解“ORC文件读写工具类和Flink输出ORC格式文件的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ORC文件读写工具类和Flink输出ORC格式文件的方法”吧!

10年积累的成都做网站、成都网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有黄陂免费网站建设让你可以放心的选择与我们合作。
压缩
压缩比例在1:7到1:10之间,3份副本的话会节省接近10倍空间
调查数据周末要给出
数据压缩后要注意负载均衡问题,可以尝试reblance
导出
hive的orc文件使用sqoop导出到MySQL使用hcatalog直接增加一些配置参数即可
查看
以json方式查看orc文件
hive --orcfiledump -j -p /user/hive/warehouse/dim.db/dim_province/000000_0
下载
以KV形式查看orc文件
hive --orcfiledump -d /user/hive/warehouse/dim.db/dim_province/000000_0 > myfile.txt
orc读取会查找字段在min和max中的值,不包含则跳过,所以速度会快
注意事项: 在windows读写时,请务必保证classpath ,path中不要有hadoop的环境变量! 如果有,请先删除,并且重启IDE
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
public class CoreReader {
public static void main(Configuration conf, String[] args) throws IOException {
// Get the information from the file footer
Reader reader = OrcFile.createReader(new Path("my-file.orc"),
OrcFile.readerOptions(conf));
System.out.println("File schema: " + reader.getSchema());
System.out.println("Row count: " + reader.getNumberOfRows());
// Pick the schema we want to read using schema evolution
TypeDescription readSchema =
TypeDescription.fromString("struct");
// Read the row data
VectorizedRowBatch batch = readSchema.createRowBatch();
RecordReader rowIterator = reader.rows(reader.options()
.schema(readSchema));
LongColumnVector z = (LongColumnVector) batch.cols[0];
BytesColumnVector y = (BytesColumnVector) batch.cols[1];
LongColumnVector x = (LongColumnVector) batch.cols[2];
while (rowIterator.nextBatch(batch)) {
for(int row=0; row < batch.size; ++row) {
int zRow = z.isRepeating ? 0: row;
int xRow = x.isRepeating ? 0: row;
System.out.println("z: " +
(z.noNulls || !z.isNull[zRow] ? z.vector[zRow] : null));
System.out.println("y: " + y.toString(row));
System.out.println("x: " +
(x.noNulls || !x.isNull[xRow] ? x.vector[xRow] : null));
}
}
rowIterator.close();
}
public static void main(String[] args) throws IOException {
main(new Configuration(), args);
}
} import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import java.io.IOException;import java.nio.charset.StandardCharsets;public class CoreWriter { public static void main(Configuration conf, String[] args) throws IOException {
TypeDescription schema =
TypeDescription.fromString("struct");
Writer writer = OrcFile.createWriter(new Path("my-file.orc"),
OrcFile.writerOptions(conf)
.setSchema(schema));
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector x = (LongColumnVector) batch.cols[0];
BytesColumnVector y = (BytesColumnVector) batch.cols[1];for(int r=0; r < 10000; ++r) { int row = batch.size++;
x.vector[row] = r; byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
y.setRef(row, buffer, 0, buffer.length); // If the batch is full, write it out and start over. if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}if (batch.size != 0) {
writer.addRowBatch(batch);
}
writer.close();
} public static void main(String[] args) throws IOException {main(new Configuration(), args);
}
} import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.TypeDescription;
import java.util.Properties;
public class StreamingWriteFileOrc {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);
DataStream dataStream = env.addSource(
new MySource());
//写入orc格式的属性
final Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "LZ4");
//定义类型和字段名
LogicalType[] orcTypes = new LogicalType[]{
new IntType(), new DoubleType(), new VarCharType()};
String[] fields = new String[]{"a", "b", "c"};
TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
orcTypes,
fields));
//构造工厂类OrcBulkWriterFactory
final OrcBulkWriterFactory factory = new OrcBulkWriterFactory<>(
new RowDataVectorizer(typeDescription.toString(), orcTypes),
writerProps,
new Configuration());
StreamingFileSink orcSink = StreamingFileSink
.forBulkFormat(new Path("file:///tmp/aaaa"), factory)
.build();
dataStream.addSink(orcSink);
env.execute();
}
public static class MySource implements SourceFunction{
@Override
public void run(SourceContext sourceContext) throws Exception{
while (true){
GenericRowData rowData = new GenericRowData(3);
rowData.setField(0, (int) (Math.random() * 100));
rowData.setField(1, Math.random() * 100);
rowData.setField(2, org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100)));
sourceContext.collect(rowData);
Thread.sleep(1);
}
}
@Override
public void cancel(){
}
}
} UTF-8 1.8 UTF-8 UTF-8 1.8 UTF-8 1.8 2.11 2.11 1.12.3 1.2.0 1.7.21 1.3.1 compile commons-cli commons-cli 1.4 commons-codec commons-codec 1.15 junit junit 4.11 test org.apache.hbase hbase-client ${hbase.version} org.apache.hadoop hadoop-yarn-common org.apache.hadoop hadoop-yarn-api hadoop-mapreduce-client-core org.apache.hadoop hadoop-auth org.apache.hadoop hadoop-common org.apache.hadoop commons-lang commons-lang 2.6 org.apache.commons commons-lang3 3.3.2 mysql mysql-connector-java 5.1.47 com.alibaba fastjson 1.2.28 org.apache.flink flink-java ${flink.cluster.version} ${scope.value} org.apache.flink flink-table ${flink.cluster.version} pom ${scope.value} org.apache.flink flink-table-api-scala-bridge_2.11 ${flink.cluster.version} ${scope.value} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.cluster.version} ${scope.value} org.apache.flink flink-connector-filesystem_2.11 1.11.3 org.apache.flink flink-connector-filesystem_${scala.version} 1.11.3 org.apache.flink flink-orc_2.11 1.12.3 ${scope.value} org.apache.flink flink-ml_${scala.version} 1.8.1 ${scope.value} org.apache.flink flink-table-planner-blink_2.11 ${flink.cluster.version} ${scope.value} org.apache.flink flink-table-common ${flink.cluster.version} ${scope.value} org.apache.flink flink-streaming-java_${scala.version} 1.12.3 ${scope.value} org.apache.flink flink-streaming-scala_${scala.version} ${flink.cluster.version} commons-lang3 org.apache.commons commons-cli commons-cli ${scope.value} org.apache.flink flink-connector-kafka_${scala.version} ${flink.cluster.version} log4j log4j org.slf4j slf4j-log4j12 org.apache.hadoop hadoop-common 2.7.3 ${scope.value} org.apache.hadoop hadoop-hdfs 2.7.3 ${scope.value} xml-apis xml-apis org.apache.flink flink-parquet_${scala.version} ${flink.cluster.version} org.apache.flink flink-avro ${flink.cluster.version} org.slf4j slf4j-api ${slf4j.version} ch.qos.logback logback-core ${logback.version} ch.qos.logback logback-classic ${logback.version} redis.clients jedis 3.0.0 org.apache.commons commons-pool2 2.5.0 com.alibaba druid 1.0.11 org.apache.flink flink-clients_2.11 ${flink.cluster.version} org.apache.hive hive-jdbc 1.2.1 org.apache.hadoop hadoop-client 2.7.3
到此,相信大家对“ORC文件读写工具类和Flink输出ORC格式文件的方法”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!